zoukankan      html  css  js  c++  java
  • 01 Rabbit MQ 的初步使用

    Rabbit MQ 的初步使用

    一、先来说说 Java 代码中的初步集成吧

    0. 业务场景模拟

    1. 在默认的虚拟主机(virtual-host)/下,创建一个名为testTopic类型的交换机(exchange),并设置其属性为不持久化(durable)、不自动删除(autoDelete)。

    2. 同时创建了一个名为hello的队列(Queue),并且设置不持久化(durable)、不排外的(exclusive)、不自动删除(autoDelete)。

    3. 将这个Queueexchange绑定(binding)起来。并同时指定一个routingKey。这个key的作用就是让流进Exchange的消息流进指定的Queue。topic类型的exchange使用全名routingKey的话,就相当于点对点的发送接收了(初步认识rabbit mq感觉是这样子,后续继续深入学习时,发现这个观点不对的话,会改正的)。

    1. 项目依赖

    Maven pom文件里添加坐标。

    	<!-- Rabbit MQ 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    

    2. 在application.yml文件里配置一些参数

    # spring配置
    spring:
      rabbitmq:
        host: 10.0.43.27
        port: 5673
        username: guest
        password: guest
        virtual-host: /
        # 这里配置消费者接收消息后,需要进行 ack 确认,这个消息才算被消费掉
        listener:
          simple:
            acknowledge-mode: manual
    

    3. 写配置类

    package com.unidata.cloud.logservice.manager.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * 
     * @author demo@demo.com
     * @date 2019/11/19 15:21
     */
    @Configuration
    public class RabbitmqConfig {
    
        @Bean
        public Queue helloQueue() {
            return new Queue("hello",false,false,false);
        }
    
        @Bean
        public TopicExchange exchange(){
            return new TopicExchange("test", false, false);
        }
    
        @Bean
        public Binding bind(Queue queue,TopicExchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with("demo_key");
        }
    }
    

    4. 发送消息

    package com.unidata.cloud.logservice.manager.rabbitmq;
    
    import lombok.AllArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @author demo@demo.com
     * @date 2019/11/19 17:51
     */
    @RestController
    @Slf4j
    @AllArgsConstructor
    public class ProduceController {
    
        private AmqpTemplate rabbitTemplate;
    
        @RequestMapping(value = "/send")
        public String sendMsg(String msg){
            rabbitTemplate.convertAndSend("hello", "{"name":"demo01"}");
            return "ok";
        }
    }
    

    5. 监听消息

    package com.unidata.cloud.logservice.manager.rabbitmq;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    
    /**
     * @author demo@demo.com
     * @date 2019/11/19 17:52
     */
    @Component
    @Slf4j
    public class ReceiveListener {
    
        @Autowired
        RabbitTemplate template;
    
        @RabbitHandler
        @RabbitListener(queues = "hello")
        public void processMsg(Message message, Channel channel) throws IOException {
            try {
                byte[] body = message.getBody();
                String result = new String(body, StandardCharsets.UTF_8);
                JSONObject jsonObject = JSON.parseObject(result);
                System.out.println(jsonObject);
                // you can do something here.
    				
                // 进行消费确认
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                log.info("消息消费成功");
            } catch (Exception e) {
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                log.error(e.getLocalizedMessage());
            }
        }
    }
    

    6. 测试

    1. 如图,发送消息

    1. 查看控制台,已经消费成功

    7. 初体验总结

    上边六个步骤,进行了一个简单的,生产者消息发送,消费者消息消费的过程。

    当运行这个项目时,会根据RabbitmqConfig类里的配置信息自动创建出 exchange 和 queue 。

    题外:

  • 相关阅读:
    09 Django组件之用户认证组件
    二叉树的三种遍历(非递归)
    CoderForce 141C-Queue (贪心+构造)
    CoderForce 140C-New Year Snowmen(贪心)
    UVA-1663 Purifying Machine (最大匹配数)
    UVA-10801 Lift Hopping (最短路)
    UVA-1660 Cable TV Network (最小割)
    UVA-820 Internet Bandwidth (最大流)
    UVA-1336 Fixing the Great Wall(区间DP)
    棋盘分割(二维区间DP)
  • 原文地址:https://www.cnblogs.com/kjgym/p/11900792.html
Copyright © 2011-2022 走看看