zoukankan      html  css  js  c++  java
  • RabbitMQ在SpringBoot中的使用

    SpringBoot应用可以完成自动配置及依赖注入——可以通过Spring直接提供与MQ的连接对象

    6.1 消息生产者

    • 创建SpringBoot应用,添加依赖

    • 配置application.yml

      

    server:
      port: 9001
    spring:
      application:
        name: producer
      rabbitmq:
        host: 47.96.11.185
        port: 5672
        virtual-host: host1
        username: ytao
        password: admin123

    发送消息

    @Service
    public class TestService {
    
        @Resource
        private AmqpTemplate amqpTemplate;
    
        public void sendMsg(String msg){
    
            //1. 发送消息到队列
            amqpTemplate.convertAndSend("queue1",msg);
    
            //2. 发送消息到交换机(订阅交换机)
            amqpTemplate.convertAndSend("ex1","",msg);
    
            //3. 发送消息到交换机(路由交换机)
            amqpTemplate.convertAndSend("ex2","a",msg);
            
        }
    
    }

    6.2 消息消费者

    • 创建项目添加依赖

    • 配置yml

    • 接收消息

    @Service
    //@RabbitListener(queues = {"queue1","queue2"})
    @RabbitListener(queues = "queue1")
    public class ReceiveMsgService {
    
        @RabbitHandler
        public void receiveMsg(String msg){
            System.out.println("接收MSG:"+msg);
        }
    
        //@RabbitHandler
        //public void receiveMsg(byte[] bs){
        //
        //}
    
    }

    二、使用RabbitMQ传递对象

    RabbitMQ是消息队列,发送和接收的都是字符串/字节数组类型的消息

    2.1 使用序列化对象

      

      要求:

    •   传递的对象实现序列化接口

    • 传递的对象的包名、类名、属性名必须一致

    消息提供者

    @Service
    public class MQService {
    
        @Resource
        private AmqpTemplate amqpTemplate;
    
        public void sendGoodsToMq(Goods goods){
            //消息队列可以发送 字符串、字节数组、序列化对象
            amqpTemplate.convertAndSend("","queue1",goods);
        }
    
    }

    消息消费者

    @Component
    @RabbitListener(queues = "queue1")
    public class ReceiveService {
    
        @RabbitHandler
        public void receiveMsg(Goods goods){
            System.out.println("Goods---"+goods);
        }
    
    }

    2.2 使用序列化字节数组

    要求:

    • 传递的对象实现序列化接口

    • 传递的对象的包名、类名、属性名必须一致

    消息提供者

    @Service
    public class MQService {
    
        @Resource
        private AmqpTemplate amqpTemplate;
    
        public void sendGoodsToMq(Goods goods){
            //消息队列可以发送 字符串、字节数组、序列化对象
            byte[] bytes = SerializationUtils.serialize(goods);
            amqpTemplate.convertAndSend("","queue1",bytes);
        }
    
    }

    消息消费者

    @Component
    @RabbitListener(queues = "queue1")
    public class ReceiveService {
    
        @RabbitHandler
        public void receiveMsg(byte[] bs){
            Goods goods = (Goods) SerializationUtils.deserialize(bs);
            System.out.println("byte[]---"+goods);
        }
    
    }

    2.3 使用JSON字符串传递

    要求:对象的属性名一直

    • 消息提供者

    @Service
    public class MQService {
    
        @Resource
        private AmqpTemplate amqpTemplate;
    
        public void sendGoodsToMq(Goods goods) throws JsonProcessingException {
            //消息队列可以发送 字符串、字节数组、序列化对象
            ObjectMapper objectMapper = new ObjectMapper();
            String msg = objectMapper.writeValueAsString(goods);
            amqpTemplate.convertAndSend("","queue1",msg);
        }
    
    }

    消息消费者

    @Component
    @RabbitListener(queues = "queue1")
    public class ReceiveService {
    
        @RabbitHandler
        public void receiveMsg(String msg) throws JsonProcessingException {
            ObjectMapper objectMapper = new ObjectMapper();
            Goods goods = objectMapper.readValue(msg,Goods.class);
            System.out.println("String---"+msg);
        }
    }

    三、基于Java的交换机与队列创建

    我们使用消息队列,消息队列和交换机可以通过管理系统完成创建,也可以在应用程序中通过Java代码来完成创建

    3.1 普通Maven项目交换机及队列创建

    • 使用Java代码新建队列

    • //1.定义队列 (使用Java代码在MQ中新建一个队列)
      //参数1:定义的队列名称
      //参数2:队列中的数据是否持久化(如果选择了持久化)
      //参数3: 是否排外(当前队列是否为当前连接私有)
      //参数4:自动删除(当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据))
      //参数5:设置当前队列的参数
      channel.queueDeclare("queue7",false,false,false,null);
    • 新建交换机
    • //定义一个“订阅交换机”
      channel.exchangeDeclare("ex3", BuiltinExchangeType.FANOUT);
      //定义一个“路由交换机”
      channel.exchangeDeclare("ex4", BuiltinExchangeType.DIRECT);
    • 绑定队列到交换机
    • //绑定队列
      //参数1:队列名称
      //参数2:目标交换机
      //参数3:如果绑定订阅交换机参数为"",如果绑定路由交换机则表示设置队列的key
      channel.queueBind("queue7","ex4","k1");
      channel.queueBind("queue8","ex4","k2");

    3.2 SpringBoot应用中通过配置完成队列的创建

    @Configuration
    public class RabbitMQConfiguration {
    
        //声明队列
        @Bean
        public Queue queue9(){
            Queue queue9 = new Queue("queue9");
            //设置队列属性
            return queue9;
        }
        @Bean
        public Queue queue10(){
            Queue queue10 = new Queue("queue10");
            //设置队列属性
            return queue10;
        }
    
        //声明订阅模式交换机
        @Bean
        public FanoutExchange ex5(){
            return new FanoutExchange("ex5");
        }
    
        //声明路由模式交换机
        @Bean
        public DirectExchange ex6(){
            return new DirectExchange("ex6");
        }
    
        //绑定队列
        @Bean
        public Binding bindingQueue9(Queue queue9, DirectExchange ex6){
            return BindingBuilder.bind(queue9).to(ex6).with("k1");
        }
        @Bean
        public Binding bindingQueue10(Queue queue10, DirectExchange ex6){
            return BindingBuilder.bind(queue10).to(ex6).with("k2");
        }
    }

    四、消息的可靠性

    4.1 RabbitMQ事务

    当在消息发送过程中添加了事务,处理效率降低几十倍甚至上百倍

    channel.txSelect();  //开启事务
    try{
        channel.basicPublish("ex4", "k1", null, msg.getBytes());
        System.out.println("发送:" + msg);
        channel.txCommit(); //提交事务
    }catch (Exception e){
        channel.txRollback(); //事务回滚
    }

    4.2 RabbitMQ消息确认和return机制

    消息确认机制:确认消息提供者是否成功发送消息到交换机

    return机制:确认消息是否成功的从交换机分发到队列

     

     

     

    4.2.1 普通Maven项目的消息确认
    • 普通confirm方式

    • //1.发送消息之前开启消息确认
      channel.confirmSelect();
      
      channel.basicPublish("ex1", "a", null, msg.getBytes());
      
      //2.接收消息确认
      boolean b = channel.waitForConfirms(); 
      
      System.out.println("发送:" +(b?"成功":"失败"));
    • 批量confirm方式
    • //1.发送消息之前开启消息确认
      channel.confirmSelect();
      
      //2.批量发送消息
      for (int i=0 ; i<10 ; i++){
          channel.basicPublish("ex1", "a", null, msg.getBytes());
      }
      
      //3.接收批量消息确认:发送的所有消息中,如果有一条是失败的,则所有消息发送直接失败,抛出IO异常
      boolean b = channel.waitForConfirms(); 
    • 异步confirm方式
    • //发送消息之前开启消息确认
      channel.confirmSelect();
      
      //批量发送消息
      for (int i=0 ; i<10 ; i++){
          channel.basicPublish("ex1", "a", null, msg.getBytes());
      }
      
      //假如发送消息需要10s,waitForConfirms会进入阻塞状态
      //boolean b = channel.waitForConfirms();
      
      //使用监听器异步confirm
      channel.addConfirmListener(new ConfirmListener() {
          //参数1: long l  返回消息的表示
          //参数2: boolean b 是否为批量confirm
          public void handleAck(long l, boolean b) throws IOException {
              System.out.println("~~~~~消息成功发送到交换机");
          }
          public void handleNack(long l, boolean b) throws IOException {
              System.out.println("~~~~~消息发送到交换机失败");
          }
      });

      

    4.2.2 普通Maven项目的return机制
    • 添加return监听器

    • 发送消息是指定第三个参数为true

    • 由于监听器监听是异步处理,所以在消息发送之后不能关闭channel

    String msg = "Hello HuangDaoJun!";
    Connection connection = ConnectionUtil.getConnection();     //相当于JDBC操作的数据库连接
    Channel channel = connection.createChannel();               //相当于JDBC操作的statement
    
    //return机制:监控交换机是否将消息分发到队列
    channel.addReturnListener(new ReturnListener() {
        public void handleReturn(int i, String s, String s1, String s2,AMQP.BasicProperties basicProperties,byte[] bytes) throws IOException {
            //如果交换机分发消息到队列失败,则会执行此方法(用来处理交换机分发消息到队列失败的情况)
            System.out.println("*****"+i);  //标识
            System.out.println("*****"+s);  //
            System.out.println("*****"+s1); //交换机名
            System.out.println("*****"+s2); //交换机对应的队列的key
            System.out.println("*****"+new String(bytes));  //发送的消息
        }
    });
    
    //发送消息
    //channel.basicPublish("ex2", "c", null, msg.getBytes());
    channel.basicPublish("ex2", "c", true, null, msg.getBytes());

    4.3 在SpringBoot应用实现消息确认与return监听

    4.3.1 配置application.yml,开启消息确认和return监听
    spring:
      rabbitmq:
        publisher-confirm-type: simple  ## 开启消息确认模式
        publisher-returns: true        ##使用return监听机制

    4.3.2 创建confirm和return监听

    @Component
    public class MsgConfirmAndReturn implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
    
        Logger logger = LoggerFactory.getLogger(MsgConfirmAndReturn.class);
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        public void init(){
            rabbitTemplate.setConfirmCallback(this);
            rabbitTemplate.setReturnCallback(this);
        }
    
        @Override
        public void confirm(CorrelationData correlationData, boolean b, String s) {
            //此方法用于监听消息确认结果(消息是否发送到交换机)
            if(b){
                logger.info("-------消息成功发送到交换机");
            }else{
                logger.warn("-------消息发送到交换机失败");
            }
        }
    
        @Override
        public void returnedMessage(Message message, int i, String s, String s1, String s2) {
            //此方法用于return监听(当交换机分发消息到队列失败时执行)
            logger.warn("~~~~~~~交换机分发消息到队列失败");
        }
    }

    五、延迟机制

    5.1 延迟队列

    • 延迟队列——消息进入到队列之后,延迟指定的时间才能被消费者消费

    • AMQP协议和RabbitMQ队列本身是不支持延迟队列功能的,但是可以通过TTL(Time To Live)特性模拟延迟队列的功能

    • TTL就是消息的存活时间。RabbitMQ可以分别对队列和消息设置存活时间

    • 在创建队列的时候可以设置队列的存活时间,当消息进入到队列并且在存活时间内没有消费者消费,则此消息就会从当前队列被移除;

    • 创建消息队列没有设置TTL,但是消息设置了TTL,那么当消息的存活时间结束,也会被移除;

    • 当TTL结束之后,我们可以指定将当前队列的消息转存到其他指定的队列

    5.2 使用延迟队列实现订单支付监控

    5.2.1 实现流程图

    5.2.2 创建交换机和队列

    2.创建消息队列

    3.创建死信队列

    4.队列绑定

  • 相关阅读:
    Cg:访问OpenGL的状态
    C++ Exception Handling
    C语言的调用规约(Calling Convension)之参数传递和返回值
    Why is FBX readonly in animation editor when imported?
    如何在Visual Studio中编译wxWidgets
    ICU字符集编码转换一例
    VisTools
    关于数值分析和LCP问题的一些开源项目
    C++: The Case Against Global Variables
    老男孩筷子兄弟
  • 原文地址:https://www.cnblogs.com/jikeyi/p/13339186.html
Copyright © 2011-2022 走看看