zoukankan      html  css  js  c++  java
  • Springboot2.x整合RabbitMQ

    1、RabbitMQ介绍

    可参照RabbitMQ笔记

    2、接入配置

    pom依赖

    <!--amqp依赖-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    配置文件

    server.port=8080
    
    spring.application.name=springboot-rabbitmq
    spring.rabbitmq.host=192.168.242.131
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    # 开启发送确认
    spring.rabbitmq.publisher-confirms=true
    # 开启发送失败退回
    spring.rabbitmq.publisher-returns=true
    # 开启ACK
    spring.rabbitmq.listener.direct.acknowledge-mode=manual
    spring.rabbitmq.listener.simple.acknowledge-mode=manual

    3、一对一模式

      即一个生产者对一个消费者模式

    配置类

    @Configuration
    public class RabbitMqConfig {
    
        @Bean
        public Queue kinsonQueue() {
            return new Queue("kinson");
        }
    
    }

    消费者

    @Component
    //监听队列kinson
    @RabbitListener(queues = {"kinson"})
    public class MyReceiver1 {
    
        @RabbitHandler
        public void receiver(String msg) {
            System.out.println("MyReceiver1 :" + msg);
        }
    }

    消息生产者测试接口

        /**
         * 单条消息发送给单个队列,该队列只有一个消费者
         *
         * @return
         */
        @GetMapping(value = "send")
        public String send() {
            String content = "Date:" + System.currentTimeMillis();
            //发送默认交换机对应的的队列kinson
            amqpTemplate.convertAndSend("kinson", content);
            return content;
        }

    4、一对多模式

      即一个生产者对多个消费者,该模式下可以是一个生产者将消息投递到一个队列,该队列对应多个消费者,此时每条消息只会被消费一次,多个消费者循环处理。另外也可以是一个生产者将消息投递到多个队列里,此时消息是被复制处理。

    模式一:

    配置类

    @Configuration
    public class RabbitMqConfig {
    
        @Bean
        public Queue kinsonQueue() {
            return new Queue("kinson");
        }
    
    }

    消费者1

    @Component
    //监听队列kinson
    @RabbitListener(queues = {"kinson"})
    public class MyReceiver1 {
    
        @RabbitHandler
        public void receiver(String msg) {
            System.out.println("MyReceiver1 :" + msg);
        }
    }

    消费者2

    @Component
    //监听队列kinson
    @RabbitListener(queues = {"kinson"})
    public class MyReceiver2 {
    
        @RabbitHandler
        public void receiver(String msg) {
            System.out.println("MyReceiver2 :" + msg);
        }
    }

    消息生产者测试接口

        /**
         * 发送多条消息到一个队列,该队列有多个消费者
         *
         * @return
         */
        @GetMapping(value = "sendMore")
        public String sendMore() {
            List<String> result = new ArrayList<String>();
            //发送10条数据
            for (int i = 0; i < 10; i++) {
                String content = "第" + (i + 1) + "次发送 Date:" + System.currentTimeMillis();
                //发送默认交换机对应的的队列kinson,此时有两个消费者MyReceiver1和MyReceiver2,每条消息只会被消费一次
                amqpTemplate.convertAndSend("kinson", content);
                result.add(content);
            }
            return String.join("<br/>", result);
        }

    模式二:

    配置类

    @Configuration
    public class RabbitMqConfig {
    
        @Bean
        public Queue kinsonQueue() {
            return new Queue("kinson");
        }
    
        @Bean
        public Queue kinsonQueue2() {
            return new Queue("kinson2");
        }
    }

    kinson队列消费者

    @Component
    //监听队列kinson
    @RabbitListener(queues = {"kinson"})
    public class MyReceiver1 {
    
        @RabbitHandler
        public void receiver(String msg) {
            System.out.println("MyReceiver1 :" + msg);
        }
    }

    kinson2队列消费者

    @Component
    //监听队列kinson2
    @RabbitListener(queues = {"kinson2"})
    public class MyReceiver3 {
    
        @RabbitHandler
        public void receiver(String msg) {
            System.out.println("MyReceiver3 :" + msg);
        }
    }

    消息生产者测试接口

      /**
         * 发送多条消息到多个队列
         *
         * @return
         */
        @GetMapping(value = "sendMoreQueue")
        public String sendMoreQueue() {
            List<String> result = new ArrayList<String>();
            //发送10条数据
            for (int i = 0; i < 10; i++) {
                String content = "第" + (i + 1) + "次发送 Date:" + System.currentTimeMillis();
                //发送默认交换机对应的的队列kinson
                amqpTemplate.convertAndSend("kinson", content);
                //发送默认交换机对应的的队列kinson2
                amqpTemplate.convertAndSend("kinson2", content);
                result.add(content);
            }
            return String.join("<br/>", result);
        }

    相应测试结果请自测

    5、ACK消息确认

    配置文件加入相应配置

    # 开启发送确认
    spring.rabbitmq.publisher-confirms=true
    # 开启发送失败退回
    spring.rabbitmq.publisher-returns=true
    # 开启ACK
    spring.rabbitmq.listener.direct.acknowledge-mode=manual
    spring.rabbitmq.listener.simple.acknowledge-mode=manual

    配置类,使用Fanout类型的Exchange,主要是设置队列,交换机及绑定

    @Configuration
    public class RabbitMqFanoutACKConfig {
    
        @Bean
        public Queue ackQueue() {
            return new Queue("ackQueue");
        }
    
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
        }
    
        @Bean
        Binding bindingAckQueue2Exchange(Queue ackQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(ackQueue).to(fanoutExchange);
        }
    
    }

    消息发送服务

    @Service
    public class AckSenderService implements RabbitTemplate.ReturnCallback {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Override
        public void returnedMessage(Message message, int i, String s, String s1, String s2) {
            System.out.println("AckSender returnedMessage " + message.toString() + " === " + i + " === " + s1 + " === " + s2);
    } /** * 消息发送 */ public void send() { final String content = "现在时间是" + LocalDateTime.now(ZoneId.systemDefault()); //设置返回回调 rabbitTemplate.setReturnCallback(this); //设置确认回调 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { System.out.println("消息发送成功!"); } else { System.out.println("消息发送失败," + cause + correlationData.toString()); } }); rabbitTemplate.convertAndSend("ackQueue", content); } }

    消息消费者

    @Component
    @RabbitListener(queues = {"ackQueue"})
    public class MyAckReceiver {
    
        @RabbitHandler
        public void process(String sendMsg, Channel channel, Message message) {
    
            System.out.println("AckReceiver  : 收到发送消息 " + sendMsg + ",收到消息时间"
                    + LocalDateTime.now(ZoneId.systemDefault()));
    
            try {
                //告诉服务器收到这条消息已经被当前消费者消费了,可以在队列安全删除,这样后面就不会再重发了,
                //否则消息服务器以为这条消息没处理掉,后续还会再发
                //第二个参数是消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                System.out.println("process success");
            } catch (Exception e) {
                System.out.println("process fail");
                e.printStackTrace();
            }
    
        }
    }

    测试访问接口

       /**
         * @return
         */
        @GetMapping(value = "ackSend")
        public String ackSend() {
            senderService.send();
    
            return "ok";
        }

    测试将Consumer确认代码注释掉,即

    @Component
    @RabbitListener(queues = {"ackQueue"})
    public class MyAckReceiver {
    
        @RabbitHandler
        public void process(String sendMsg, Channel channel, Message message) {
    
            System.out.println("AckReceiver  : 收到发送消息 " + sendMsg + ",收到消息时间"
                    + LocalDateTime.now(ZoneId.systemDefault()));
    
            try {
                //告诉服务器收到这条消息已经被当前消费者消费了,可以在队列安全删除,这样后面就不会再重发了,
                //否则消息服务器以为这条消息没处理掉,后续还会再发
                //第二个参数是消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
                //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                System.out.println("process success");
            } catch (Exception e) {
                System.out.println("process fail");
                e.printStackTrace();
            }
    
        }
    }

    此时访问测试接口,可以看到当消息发送完被消费掉之后,队列的状态变为unacked。

    当停掉服务时,unacked状态变为Ready

    再重新启动服务时会重新发送消息

    6、事务机制

    事务的实现主要是对信道(Channel)的设置,主要的方法有三个:
    //声明启动事务模式
    channel.txSelect();
    //提交事务
    channel.txComment();
    //回滚事务
    channel.txRollback();

    消息发送示例

    public void publish()
                throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException {
            // 创建连接
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername(username);
            factory.setPassword(password);
            factory.setVirtualHost("/");
            factory.setHost(host);
            factory.setPort(port);
            Connection conn = factory.newConnection();
            // 创建信道
            Channel channel = conn.createChannel();
            // 声明队列
            channel.queueDeclare(TX_QUEUE, true, false, false, null);
    
            try {
    
                long startTime = System.currentTimeMillis();
    
                for (int i = 0; i < 10; i++) {
                    // 声明事务
                    channel.txSelect();
                    String message = String.format("时间 => %s", System.currentTimeMillis());
                    // 发送消息
                    channel.basicPublish("", TX_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,
                            message.getBytes("UTF-8"));
                    // 提交事务
                    channel.txCommit();
                }
    
                long endTime = System.currentTimeMillis();
    
                System.out.println("事务模式,发送10条数据,执行花费时间:" + (endTime - startTime) + "s");
    
            } catch (Exception e) {
                channel.txRollback();
            } finally {
                channel.close();
                conn.close();
            }
        }

    消息消费示例

    public void consume() throws IOException, TimeoutException, InterruptedException {
    
            Connection conn = RabbitMqConnFactoryUtil.getRabbitConn();
            Channel channel = conn.createChannel();
            channel.queueDeclare(TX_QUEUE, true, false, false, null);
            // 声明事务
            channel.txSelect();
            try {
                //单条消息获取进行消费
                GetResponse resp = channel.basicGet(TX_QUEUE, false);
                String message = new String(resp.getBody(), "UTF-8");
                System.out.println("收到消息:" + message);
                //消息拒绝
                // channel.basicReject(resp.getEnvelope().getDeliveryTag(), true);
                // 消息确认
                channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);
                // 提交事务
                channel.txCommit();
            } catch (Exception e) {
                // 回滚事务
                channel.txRollback();
            } finally {
                //关闭通道、连接
                channel.close();
                conn.close();
            }
        }

    7、Confirm消息确认

    Confirm发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的,Confirm的三种实现方式:
    //方式一:普通发送方确认模式
    channel.waitForConfirms();
    //方式二:批量确认模式
    channel.waitForConfirmsOrDie();
    //方式三:异步监听发送方确认模式
    channel.addConfirmListener();

    消息发布示例

    public void publish() throws IOException, TimeoutException, InterruptedException {
            // 创建连接
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername(username);
            factory.setPassword(password);
            factory.setVirtualHost("/");
            factory.setHost(host);
            factory.setPort(port);
            Connection conn = factory.newConnection();
            // 创建信道
            Channel channel = conn.createChannel();
            // 声明队列
            channel.queueDeclare(CONFIRM_QUEUE, false, false, false, null);
    
            long startTime = System.currentTimeMillis();
    
            for (int i = 0; i < 10; i++) {
                // 开启发送方确认模式
                channel.confirmSelect();
                String message = String.format("时间 => %s", System.currentTimeMillis());
                channel.basicPublish("", CONFIRM_QUEUE, null, message.getBytes("UTF-8"));
            }
    
            //添加确认监听器
            channel.addConfirmListener(new ConfirmListener() {
    
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("未确认消息,标识:" + deliveryTag);
                }
    
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
                }
            });
    
            long endTime = System.currentTimeMillis();
    
            System.out.println("执行花费时间:" + (endTime - startTime) + "s");
    
        }

    RabbitMQ简单示例源码参照Github

  • 相关阅读:
    HBase 高性能加入数据
    Please do not register multiple Pages in undefined.js 小程序报错的几种解决方案
    小程序跳转时传多个参数及获取
    vue项目 调用百度地图 BMap is not defined
    vue生命周期小笔记
    解决小程序背景图片在真机上不能查看的问题
    vue项目 菜单侧边栏随着右侧内容盒子的高度实时变化
    vue项目 一行js代码搞定点击图片放大缩小
    微信小程序进行地图导航使用地图功能
    小程序报错Do not have xx handler in current page的解决方法
  • 原文地址:https://www.cnblogs.com/kingsonfu/p/10599608.html
Copyright © 2011-2022 走看看