zoukankan      html  css  js  c++  java
  • RabbitMQ整合springboot

    一、springboot版本和依赖

     

    • springboot 版本 2.1.5
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.5.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    • dependencies
    <dependencies>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
        </dependencies>

    二、生产端

    1、yml  文件配置  

    server:
      port: 8001
      servlet:
        context-path: /
    spring:
      rabbitmq:
        addresses: ip:5672,ip:5672,ip:5672
        username: guest
        password: guest
        virtual-host: /
        connection-timeout: 15000
    
        publisher-confirms: true
        publisher-returns: true
        template:
          mandatory: true
    
    
      application:
        name: rabbit-producer
      http:
        encoding:
          charset: UTF-8
      jackson:
        date-format: yyyy-MM-dd HH:mm:ss
        time-zone: GMT+8
        default-property-inclusion: NON_NULL
    
    rabbitmq-exchange: exchange-1
    rabbitmq-routingKey: springboot.abc
    1. address   server的ip地址加上端口号
    2. username  passowrd 账号密码
    3. virtual-host    最上层的一个域  ,例如  /order    /logistics
    4. 连接超时时间
    • 生产端的相关配置
    1. publisher-confirms  是否开启消息确认模式,举例:生产者发送消息到 broker(mq) ,我不确定我的消息是否100%已经投递到mq 中,我们会进行线程监听,mq 会返回一个成功或者是失败的情况
    2. publisher-returns  是否开启发布者退货模式,举例:生产者发送routingkey:  spring.xxx  ,queue routingkey:为 springboot.xxx 。那么不匹配路由规则。publisher-returns 设置为false 的话,这条消息就丢掉了,消失了。设置为true的话,会将消息 执行到我们指定的一对   exchange  和 queue 上。 需要和 mandatory 一起使用
    3. mandatory     是否开启强制性消息

    2、编写发送消息方法

    package com.example.producer.component;
    
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageHeaders;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;
    
    import java.util.Map;
    import java.util.UUID;
    
    /**
     * @Author: qiuj
     * @Description:
     * @Date: 2020-05-31 11:57
     */
    @Component
    public class Sender {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Value("${rabbitmq-exchange}")
        private String exchange;
        @Value("${rabbitmq-routingKey}")
        private String routingKey;
    
        /**
         * 这里就是确认消息的回调监听接口,用于确认消息是否被broker 所收到
         */
        final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("唯一id:" +  correlationData);
                System.out.println("消息是否成功投递" + ack );
                System.out.println("如果失败则会返回错误消息" + cause);
            }
        };
    
    
        public void sends (Object msg, Map<String,Object> properties) {
            //  第一步将消息包装成boot 支持的方式
            MessageHeaders messageHeaders = new MessageHeaders(properties);
            Message<?> message = MessageBuilder.createMessage(msg,messageHeaders);
            //  指定唯一id
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            //  回调方法
            MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
                @Override
                public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {
                    System.out.println("在发送消息之前的前置方法" + message);
                    return message;
                }
            };
            //  前置方法
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.convertAndSend(exchange,
                    routingKey,
                    message,
                    messagePostProcessor,
                    correlationData);
        }
    }
    

    三、消费端

    1、yml  文件配置  

    server:
      port: 8002
    spring:
      rabbitmq:
        addresses: ip:5672,ip:5672,ip:5672
        username: guest
        password: guest
        virtual-host: /
        connection-timeout: 15000
        listener:
          simple:
            acknowledge-mode: manual
            concurrency: 5
            max-concurrency: 10
            prefetch: 2
    
      application:
        name: rabbit-consumer
      http:
        encoding:
          charset: UTF-8
      jackson:
        date-format: yyyy-MM-dd HH:mm:ss
        time-zone: GMT+8
        default-property-inclusion: NON_NULL
    
    
    queue-name: queue-1
    queue-durable: true
    exchange-name: exchange-1
    exchange-topic: topic
    exchange-durable: true
    exchange-ignoreDeclarationExceptions: true
    routingkey: springboot.*
    • 消费端的相关配置
    1. acknowledge-mode 默认auto ,也就是自动签收消息,生产环境不建议,我们设置为 manual  手工的进行签收
    2. concurrency   max-concurrency  监听器调用线程的最小数量  和最大数量
    3. prefetch     在单个请求中处理的消息个数,开启限流,指定每次处理消息最多只能处理2条消息

    2、编写接受消息方法

    package com.example.consumer.component;
    
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    /**
     * @Author: qiuj
     * @Description:
     * @Date: 2020-05-30 19:29
     */
    @Component
    public class RabbitReceive {
    
    
        /**
         * 组合使用监听
         *  @RabbitListener @QueueBinding   @Queue  @Exchange
         * @param message
         * @param channel
         * @throws IOException
         */
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "${queue-name}",durable = "${queue-durable}"),
                exchange = @Exchange(
                        value = "${exchange-name}",
                        type = "${exchange-topic}",
                        durable = "${exchange-durable}",
                        ignoreDeclarationExceptions = "${exchange-ignoreDeclarationExceptions}"
                ),
                key = "${routingkey}"
        ))
        @RabbitHandler
        public void onMessage (Message message, Channel channel) throws IOException {
            //  1:收到消息以后进行业务端处理
            System.out.println("消费消息:" + message.getPayload());
            //  2:处理成功之后    获取deliveryTag   并进行手工的ACK操作,因为我们配置文件里配置的是   手工签收
            //  spring.rabbitmq.listener.simple.acknowiedge-mode=manual
            Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
            channel.basicAck(deliveryTag,false);
    
        }
    
    
    }
    

    四、 源码

    1、生产端源码

    2、消费端源码

  • 相关阅读:
    linux系统调优工具
    搭建ceph分布式文件系统
    ansible管理windows主机
    jenkins结合ansible发布
    Linux系统安全配置
    tomcat 的安全配置预防后台被攻击
    【9】添加主页日志列表展示
    【8】添加新建/编辑博客逻辑
    【7】使用css/js/html模板来实现一个注册、登录和管理的功能
    Ubuntu下给Sublime Text 3添加用python3运行文件
  • 原文地址:https://www.cnblogs.com/blogspring/p/14191745.html
Copyright © 2011-2022 走看看