zoukankan      html  css  js  c++  java
  • Spring Boot集成RabbitMQ实践

    一、创建生产者服务

    1、创建生产者服务 rabbit-producer

         spring boot版本为 2.1.16.RELEASE

    2、pom.xml 

    引入spring-boot-starter-amqp

    <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-tomcat</artifactId>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
        </dependencies>
    

      

    3、配置application.properties

    server.servlet.context-path=/
    server.port=8002
    
    # 集群用逗号分割
    spring.rabbitmq.addresses=xx.xx.xx.xx:5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    
    # 使用启动消息确认模式
    spring.rabbitmq.publisher-confirms=true
    
    #设置return消息模式,注意要与mandatory一起去配合使用
    #spring.rabbitmq.publisher-returns=true
    #spring.rabbitmq.template.mandatory=true
    
    
    spring.http.encoding.charset=UTF-8
    spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
    spring.jackson.time-zone=GMT+8
    spring.jackson.default-property-inclusion=non_null
    

      

    4、发送

    @Component
    public class RabbitSender {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 这里就是确认消息的回调监听接口,用于确认消息是否被broker所收到
         */
        final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
    
            /**
             * @param correlationData 作为一个唯一的标识
             * @param ack broker是否落盘成功
             * @param cause 失败的异常信息
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("消息ACK结果:" + ack + ",correlationData: " +correlationData.getId())  ;
            }
        };
    
    
        /**
         * 对外发送消息的方法
         * @param message 具体的消息内容
         * @param properties 额外的附加属性
         * @throws Exception
         */
        public void send(Object message, Map<String,Object> properties) throws Exception{
            MessageHeaders mhs = new MessageHeaders(properties);
            Message<?>  msg = MessageBuilder.createMessage(message, mhs);
            rabbitTemplate.setConfirmCallback(confirmCallback);
    
            //指定业务唯一的ID
            String uuid = UUID.randomUUID().toString();
            System.out.println("生成业务唯一Id=" + uuid);
            CorrelationData correlationData = new CorrelationData(uuid);
    
            MessagePostProcessor mpp = new MessagePostProcessor() {
                @Override
                public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {
                    System.out.println("postProcessMessage message: " +message);
                    return message;
                }
            };
            rabbitTemplate.convertAndSend("myexchange1", "myroutingkey.1", msg, mpp, correlationData);
    
        }
    }
    

      

    5、测试发送消息

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitProducerApplicationTests {
    
        @Autowired
        private  RabbitSender rabbitSender;
    
        @Test
        public void testSender() throws  Exception {
            Map<String,Object> properties = new HashMap<>();
            properties.put("name","zhangsan");
            properties.put("age","18");
            rabbitSender.send("hello rabbit", properties);
    
            Thread.sleep(10000);
        }
    
    
    
    }
    

      

    二、RabbitMQ消费者服务 

    1、创建RabbitMQ消费者服务 rabbit-consumer

         spring boot版本为 2.1.16.RELEASE

    2、pom.xml 

    引入spring-boot-starter-amqp

    <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-tomcat</artifactId>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
        </dependencies>
    

      

    3、配置application.properties

    server.servlet.context-path=/
    server.port=8002
    
    # 集群用逗号分割
    spring.rabbitmq.addresses=xx.xx.xx.xx:5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    
    # 表示消费者消费成功消息以后需要手工的信息签收(ack),默认为auto
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    spring.rabbitmq.listener.simple.concurrency=5
    spring.rabbitmq.listener.simple.max-concurrency=10
    spring.rabbitmq.listener.simple.prefetch=1
    
    # RabbitListener 相关配置
    spring.rabbitmq.listener.order.exchange.name=myexchange1
    spring.rabbitmq.listener.order.exchange.durable=true
    spring.rabbitmq.listener.order.exchange.type=topic
    spring.rabbitmq.listener.order.exchange.key=myroutingkey.*
    
    spring.http.encoding.charset=UTF-8
    spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
    spring.jackson.time-zone=GMT+8
    spring.jackson.default-property-inclusion=non_null
    

      

    4、创建接收者类

    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Component;
    
    
    /**
     * @description: 消息接收者
     * @author:
     * @create: 2020-08-01 09:35
     */
    @Component
    public class RabbitReceiver {
    
        @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "myqueue1", durable = "true"),
                        exchange = @Exchange(name = "${spring.rabbitmq.listener.order.exchange.name}",
                        durable= "${spring.rabbitmq.listener.order.exchange.durable}",
                        type = "${spring.rabbitmq.listener.order.exchange.type}",
                        ignoreDeclarationExceptions = "true"),
                        key = "${spring.rabbitmq.listener.order.exchange.key}"
                    )
             )
        @RabbitHandler
        public void onMessage(Message message, Channel channel) throws Exception{
            // 步骤1、收到消息后进行业务端消费处理
            System.out.println("消费消息" + message.getPayload());
    
            //步骤2、处理成功后,获取deliveryTag,并进行手工ACK操作,因为配置文件配置的是手工签收模式
            // spring.rabbitmq.listener.simple.acknowledge-mode=manual
            Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
            channel.basicAck(deliveryTag, false);
        }
    }
    

    RabbitListener相关属性配置在属性文件中。

    消费者采用手工配置 channel.basicAck

      

  • 相关阅读:
    一个爬虫的练习(妹子图)
    安装模块出现的编译问题(解决)
    基于套接字通信的简单练习(FTP)
    Python3 之选课系统
    数据爬取后台(PHP+Python)联合作战
    让pip 使用国内镜像源
    为啥学蛇和python10年后的变化
    模拟登陆百度以及Selenium 的基本用法
    冒泡排序及其效率分析(无聊搞来玩玩)
    LLVM编译器
  • 原文地址:https://www.cnblogs.com/linlf03/p/13413889.html
Copyright © 2011-2022 走看看