zoukankan      html  css  js  c++  java
  • Springboot2.x整合RabbitMQ

    1、RabbitMQ介绍

    可参照RabbitMQ笔记

    2、接入配置

    pom依赖

    <!--amqp依赖-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    配置文件

    server.port=8080
    
    spring.application.name=springboot-rabbitmq
    spring.rabbitmq.host=192.168.242.131
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    # 开启发送确认
    spring.rabbitmq.publisher-confirms=true
    # 开启发送失败退回
    spring.rabbitmq.publisher-returns=true
    # 开启ACK
    spring.rabbitmq.listener.direct.acknowledge-mode=manual
    spring.rabbitmq.listener.simple.acknowledge-mode=manual

    3、一对一模式

      即一个生产者对一个消费者模式

    配置类

    @Configuration
    public class RabbitMqConfig {
    
        @Bean
        public Queue kinsonQueue() {
            return new Queue("kinson");
        }
    
    }

    消费者

    @Component
    //监听队列kinson
    @RabbitListener(queues = {"kinson"})
    public class MyReceiver1 {
    
        @RabbitHandler
        public void receiver(String msg) {
            System.out.println("MyReceiver1 :" + msg);
        }
    }

    消息生产者测试接口

        /**
         * 单条消息发送给单个队列,该队列只有一个消费者
         *
         * @return
         */
        @GetMapping(value = "send")
        public String send() {
            String content = "Date:" + System.currentTimeMillis();
            //发送默认交换机对应的的队列kinson
            amqpTemplate.convertAndSend("kinson", content);
            return content;
        }

    4、一对多模式

      即一个生产者对多个消费者,该模式下可以是一个生产者将消息投递到一个队列,该队列对应多个消费者,此时每条消息只会被消费一次,多个消费者循环处理。另外也可以是一个生产者将消息投递到多个队列里,此时消息是被复制处理。

    模式一:

    配置类

    @Configuration
    public class RabbitMqConfig {
    
        @Bean
        public Queue kinsonQueue() {
            return new Queue("kinson");
        }
    
    }

    消费者1

    @Component
    //监听队列kinson
    @RabbitListener(queues = {"kinson"})
    public class MyReceiver1 {
    
        @RabbitHandler
        public void receiver(String msg) {
            System.out.println("MyReceiver1 :" + msg);
        }
    }

    消费者2

    @Component
    //监听队列kinson
    @RabbitListener(queues = {"kinson"})
    public class MyReceiver2 {
    
        @RabbitHandler
        public void receiver(String msg) {
            System.out.println("MyReceiver2 :" + msg);
        }
    }

    消息生产者测试接口

        /**
         * 发送多条消息到一个队列,该队列有多个消费者
         *
         * @return
         */
        @GetMapping(value = "sendMore")
        public String sendMore() {
            List<String> result = new ArrayList<String>();
            //发送10条数据
            for (int i = 0; i < 10; i++) {
                String content = "第" + (i + 1) + "次发送 Date:" + System.currentTimeMillis();
                //发送默认交换机对应的的队列kinson,此时有两个消费者MyReceiver1和MyReceiver2,每条消息只会被消费一次
                amqpTemplate.convertAndSend("kinson", content);
                result.add(content);
            }
            return String.join("<br/>", result);
        }

    模式二:

    配置类

    @Configuration
    public class RabbitMqConfig {
    
        @Bean
        public Queue kinsonQueue() {
            return new Queue("kinson");
        }
    
        @Bean
        public Queue kinsonQueue2() {
            return new Queue("kinson2");
        }
    }

    kinson队列消费者

    @Component
    //监听队列kinson
    @RabbitListener(queues = {"kinson"})
    public class MyReceiver1 {
    
        @RabbitHandler
        public void receiver(String msg) {
            System.out.println("MyReceiver1 :" + msg);
        }
    }

    kinson2队列消费者

    @Component
    //监听队列kinson2
    @RabbitListener(queues = {"kinson2"})
    public class MyReceiver3 {
    
        @RabbitHandler
        public void receiver(String msg) {
            System.out.println("MyReceiver3 :" + msg);
        }
    }

    消息生产者测试接口

      /**
         * 发送多条消息到多个队列
         *
         * @return
         */
        @GetMapping(value = "sendMoreQueue")
        public String sendMoreQueue() {
            List<String> result = new ArrayList<String>();
            //发送10条数据
            for (int i = 0; i < 10; i++) {
                String content = "第" + (i + 1) + "次发送 Date:" + System.currentTimeMillis();
                //发送默认交换机对应的的队列kinson
                amqpTemplate.convertAndSend("kinson", content);
                //发送默认交换机对应的的队列kinson2
                amqpTemplate.convertAndSend("kinson2", content);
                result.add(content);
            }
            return String.join("<br/>", result);
        }

    相应测试结果请自测

    5、ACK消息确认

    配置文件加入相应配置

    # 开启发送确认
    spring.rabbitmq.publisher-confirms=true
    # 开启发送失败退回
    spring.rabbitmq.publisher-returns=true
    # 开启ACK
    spring.rabbitmq.listener.direct.acknowledge-mode=manual
    spring.rabbitmq.listener.simple.acknowledge-mode=manual

    配置类,使用Fanout类型的Exchange,主要是设置队列,交换机及绑定

    @Configuration
    public class RabbitMqFanoutACKConfig {
    
        @Bean
        public Queue ackQueue() {
            return new Queue("ackQueue");
        }
    
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
        }
    
        @Bean
        Binding bindingAckQueue2Exchange(Queue ackQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(ackQueue).to(fanoutExchange);
        }
    
    }

    消息发送服务

    @Service
    public class AckSenderService implements RabbitTemplate.ReturnCallback {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Override
        public void returnedMessage(Message message, int i, String s, String s1, String s2) {
            System.out.println("AckSender returnedMessage " + message.toString() + " === " + i + " === " + s1 + " === " + s2);
    } /** * 消息发送 */ public void send() { final String content = "现在时间是" + LocalDateTime.now(ZoneId.systemDefault()); //设置返回回调 rabbitTemplate.setReturnCallback(this); //设置确认回调 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { System.out.println("消息发送成功!"); } else { System.out.println("消息发送失败," + cause + correlationData.toString()); } }); rabbitTemplate.convertAndSend("ackQueue", content); } }

    消息消费者

    @Component
    @RabbitListener(queues = {"ackQueue"})
    public class MyAckReceiver {
    
        @RabbitHandler
        public void process(String sendMsg, Channel channel, Message message) {
    
            System.out.println("AckReceiver  : 收到发送消息 " + sendMsg + ",收到消息时间"
                    + LocalDateTime.now(ZoneId.systemDefault()));
    
            try {
                //告诉服务器收到这条消息已经被当前消费者消费了,可以在队列安全删除,这样后面就不会再重发了,
                //否则消息服务器以为这条消息没处理掉,后续还会再发
                //第二个参数是消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                System.out.println("process success");
            } catch (Exception e) {
                System.out.println("process fail");
                e.printStackTrace();
            }
    
        }
    }

    测试访问接口

       /**
         * @return
         */
        @GetMapping(value = "ackSend")
        public String ackSend() {
            senderService.send();
    
            return "ok";
        }

    测试将Consumer确认代码注释掉,即

    @Component
    @RabbitListener(queues = {"ackQueue"})
    public class MyAckReceiver {
    
        @RabbitHandler
        public void process(String sendMsg, Channel channel, Message message) {
    
            System.out.println("AckReceiver  : 收到发送消息 " + sendMsg + ",收到消息时间"
                    + LocalDateTime.now(ZoneId.systemDefault()));
    
            try {
                //告诉服务器收到这条消息已经被当前消费者消费了,可以在队列安全删除,这样后面就不会再重发了,
                //否则消息服务器以为这条消息没处理掉,后续还会再发
                //第二个参数是消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
                //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                System.out.println("process success");
            } catch (Exception e) {
                System.out.println("process fail");
                e.printStackTrace();
            }
    
        }
    }

    此时访问测试接口,可以看到当消息发送完被消费掉之后,队列的状态变为unacked。

    当停掉服务时,unacked状态变为Ready

    再重新启动服务时会重新发送消息

    6、事务机制

    事务的实现主要是对信道(Channel)的设置,主要的方法有三个:
    //声明启动事务模式
    channel.txSelect();
    //提交事务
    channel.txComment();
    //回滚事务
    channel.txRollback();

    消息发送示例

    public void publish()
                throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException {
            // 创建连接
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername(username);
            factory.setPassword(password);
            factory.setVirtualHost("/");
            factory.setHost(host);
            factory.setPort(port);
            Connection conn = factory.newConnection();
            // 创建信道
            Channel channel = conn.createChannel();
            // 声明队列
            channel.queueDeclare(TX_QUEUE, true, false, false, null);
    
            try {
    
                long startTime = System.currentTimeMillis();
    
                for (int i = 0; i < 10; i++) {
                    // 声明事务
                    channel.txSelect();
                    String message = String.format("时间 => %s", System.currentTimeMillis());
                    // 发送消息
                    channel.basicPublish("", TX_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,
                            message.getBytes("UTF-8"));
                    // 提交事务
                    channel.txCommit();
                }
    
                long endTime = System.currentTimeMillis();
    
                System.out.println("事务模式,发送10条数据,执行花费时间:" + (endTime - startTime) + "s");
    
            } catch (Exception e) {
                channel.txRollback();
            } finally {
                channel.close();
                conn.close();
            }
        }

    消息消费示例

    public void consume() throws IOException, TimeoutException, InterruptedException {
    
            Connection conn = RabbitMqConnFactoryUtil.getRabbitConn();
            Channel channel = conn.createChannel();
            channel.queueDeclare(TX_QUEUE, true, false, false, null);
            // 声明事务
            channel.txSelect();
            try {
                //单条消息获取进行消费
                GetResponse resp = channel.basicGet(TX_QUEUE, false);
                String message = new String(resp.getBody(), "UTF-8");
                System.out.println("收到消息:" + message);
                //消息拒绝
                // channel.basicReject(resp.getEnvelope().getDeliveryTag(), true);
                // 消息确认
                channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);
                // 提交事务
                channel.txCommit();
            } catch (Exception e) {
                // 回滚事务
                channel.txRollback();
            } finally {
                //关闭通道、连接
                channel.close();
                conn.close();
            }
        }

    7、Confirm消息确认

    Confirm发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的,Confirm的三种实现方式:
    //方式一:普通发送方确认模式
    channel.waitForConfirms();
    //方式二:批量确认模式
    channel.waitForConfirmsOrDie();
    //方式三:异步监听发送方确认模式
    channel.addConfirmListener();

    消息发布示例

    public void publish() throws IOException, TimeoutException, InterruptedException {
            // 创建连接
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername(username);
            factory.setPassword(password);
            factory.setVirtualHost("/");
            factory.setHost(host);
            factory.setPort(port);
            Connection conn = factory.newConnection();
            // 创建信道
            Channel channel = conn.createChannel();
            // 声明队列
            channel.queueDeclare(CONFIRM_QUEUE, false, false, false, null);
    
            long startTime = System.currentTimeMillis();
    
            for (int i = 0; i < 10; i++) {
                // 开启发送方确认模式
                channel.confirmSelect();
                String message = String.format("时间 => %s", System.currentTimeMillis());
                channel.basicPublish("", CONFIRM_QUEUE, null, message.getBytes("UTF-8"));
            }
    
            //添加确认监听器
            channel.addConfirmListener(new ConfirmListener() {
    
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("未确认消息,标识:" + deliveryTag);
                }
    
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
                }
            });
    
            long endTime = System.currentTimeMillis();
    
            System.out.println("执行花费时间:" + (endTime - startTime) + "s");
    
        }

    RabbitMQ简单示例源码参照Github

  • 相关阅读:
    Fiddler——若网模拟测试
    MySQL——concat / instr函数
    Centos7 + docker + Jenkins搭建及测试
    常用正则表达式
    mariadb配置文件优化参数
    Python小技巧整理
    部署zabbix3.2.7,升级到3.4、proxy部署
    zabbix3.4.x添加短信报警
    个人总结OLinux上安装oracle11G Data Guard
    LRM-00109: could not open parameter file '/u01/app/oracle/product/12.1.0/db_1/dbs/initepps.ora'
  • 原文地址:https://www.cnblogs.com/kingsonfu/p/10599608.html
Copyright © 2011-2022 走看看