zoukankan      html  css  js  c++  java
  • springboot中使用RabbitMq消息队列使用笔记(使用配置版本)

    rabbitmq是什么,怎么搭建我在这就不叙述了,这里只说怎么使用。

    说个小插曲,如果有的人连接rabbitmq非常慢,经常超时或者启动rabbitmq很慢,可以在linux环境下配置一下就行了:

    第一步:打开linux输入以下命令

    [root@hadoop1 mq]# hostname
    hadoop1

    这里的hadoop1就是我的主机名,然后把这个主机名配置进hosts文件即可;

    第二步:vim /etc/hosts 命令对hosts文件进行添加一行数据:

    主机名hadoop1为例
    只需在hosts文件中加入 127.0.0.1 hadoop1
    然后wq保存退出即可。

     第三步:重启rabbitmq应该就可以解决连接,启动过慢的问题了;

    =====================================================

    以下是使用rabbitmq的过程。

    给大家推荐https://blog.csdn.net/qq_35387940/article/details/100514134这个博客,我的案例也是参照这个写的,修改部分代码加上自己的心得体会,有些原作者没有的注释我都加上了。可以先去看原作者的,然后再看这个。

     

     一个交换机可以连接多个队列,生产者发送信息给交换机,交换机根据生产者带来的路由关键字来确认传递给哪个队列,消费者可以监听一个或多个队列,当队列里存在消息的时候就会消费。

    第一步:添加相关依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    第二步:编写application.yml主配置文件,有的配置看情况而定

    server:
      port: 8888
    spring:
      rabbitmq:
        host: 192.168.1.149
        username: qjwl
        password: 123456
        virtual-host: /qjwl
        template:
          retry:
            #enabled:开启失败重试
            enabled: true
            #第一次重试的间隔时长
            initial-interval: 10000ms
            #最长重试间隔,超过这个间隔将不再重试
            max-interval: 300000ms
            #下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍
            multiplier: 2
    

    第三步:编写配置类

    @Configuration
    public class DirectConfig {
        /**
         * 配置一个队列,可以配置多个
       *第一个参数是队列的名称 *第二个参数durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,但并不是消息的持久化 *当消息代理重启时仍然存在,暂存队列:当前连接有效 *第三个参数exclusive(独有的):默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable *第四个参数autoDelete:是否自动删除,默认是false,当没有生产者或者消费者使用此队列,该队列会自动删除。 *return new Queue("TestDirectQueue",true,true,false); *一般设置一下队列的持久化就好,其余两个就是默认false
    */ @Bean public Queue myQueue1 () { return new Queue("directQueue111",true); }

      @Bean
        public Queue myQueue2 () {
            return new Queue("directQueue222",true);
        }
    
    
      /**
         * 给交换机起名字,也可以配置多个交换机
       *第一个参数是交换机的名字,
       *第二个参数durable:是否持久化,默认false,持久化交换机
       *第三个参数autoDelete,默认为false,同队列解释
      
        如果你是topic模式的,那么交换机类型就是TopicExchange,如果是广播(fanout)类型的就是FanoutExchange
       */
        @Bean
        public DirectExchange myDirectExchange() {
            return new DirectExchange("directExchange",true,false);
        }
    
    
        @Bean
        public DirectExchange lonelyExchange() {
            return new DirectExchange("lonelyExchange",true,false);
        }
    
        /*绑定  将队列和交换机绑定, 并设置用于匹配键:mykey
        队列在前,交换机在后
      1.如果是fanout的模式的话,就没有后面的with路由关键字
      2.如果是topic模式的话是支持通配符的

        通配符规则:
    ​      `#`:匹配一个或多个词
    ​      `*`:匹配不多不少恰好1个词
        举例:
    ​      `audit.#`:能够匹配`audit.irs.corporate`队列 或者 `audit.irs`队列
    ​      `audit.*`:只能匹配`audit.irs`队列

        */

        @Bean
        public Binding directBinding(){
            return BindingBuilder.bind(myQueue()).to(myDirectExchange()).with("mykey");
        //return BindingBuilder.bind(myQueue()).to(myDirectExchange()).with("audit.#"); } }

    第四步:写个接口方法进行测试

       @Autowired
       private AmqpTemplate amqpTemplate;
       @GetMapping("/sendDirectMessage")
        @ResponseBody
        public String sendDirectMessage() {
            String messageId = String.valueOf(UUID.randomUUID());
            String messageData = "test message, hello!";
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            Map<String, Object> map = new HashMap<>();
            map.put("messageId", messageId);
            map.put("messageData", messageData);
            map.put("createTime", createTime);
            //将消息携带绑定键值:路由关键字mykey 发送到交换机 directExchange
            amqpTemplate.convertAndSend("directExchange", "mykey", map);
            return "ok";
        }

    红框圈起来的是比较常用的3个方法,分别是:

    - 指定交换机、RoutingKey和消息体
    - 指定消息
    - 指定RoutingKey和消息,会向默认的交换机发送消息

    到此,生产者已经编写完成,一般来说我们生产者和消费者在不同的模块里,我也是采用这种模式,编写两个spring boot模块。

    第五步:编写消费者,配置跟生产者基本一致。

    依赖包不变,编写application.yml,注意端口不能冲突

    server:
      port: 9999
    spring:
      rabbitmq:
        host: 192.168.1.149
        username: qjwl
        password: 123456
        virtual-host: /qjwl

    第六步:编写消费者,也就是一个监听器,监听自己队列是否有数据。

    @Component//必须要写
    //@RabbitListener(queues = {"directQueue111","directQueue222"})
    public class DirectListener {
    
        /**
         * RabbitListener 注解放在类上就需要使用RabbitHandler注解放在方法上进行配合使用,
         * 根据传递过来的参数类型判断那个方法执行,如果传递过来的是String类型的,就去找带有@RabbitHandler注解的方法,参数是String的
       * 如果传递过来的是map类型的,就找参数是map的方法
       *@RabbitListener也可以直接放在方法上监听队列,如果该队列存在消息,就由这个方法来接收处理
    */ //@RabbitHandler @RabbitListener(queues = {"directQueue111"}) public void process(Map map) { System.out.println("DirectReceiver111消费者收到消息 : " + map.toString()); }
      @RabbitListener(queues = {"directQueue222"})
      public void process(Map map) {
      System.out.println("DirectReceiver222消费者收到消息 : " + map.toString());
      }
    }

    一般情况下消息队列的使用已经介绍完毕了,但是有的小伙伴就会问,我怎么知道我的消息是发出去了没有?消费者收到了没有?好的,这就涉及到了生产者的确认机制和消费者的确认机制。

    我们先来说生产者的确认机制在生产者application.yml中添加消息确认配置

    server:
      port: 8888
    spring:
      rabbitmq:
        host: 192.168.1.149
        username: qjwl
        password: 123456
        virtual-host: /qjwl
        template:
          retry:
            #enabled:开启失败重试
            enabled: true
            #第一次重试的间隔时长
            initial-interval: 10000ms
            #最长重试间隔,超过这个间隔将不再重试
            max-interval: 300000ms
            #下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍
            multiplier: 2
            
        #确认消息已发送到交换机(Exchange)
        publisher-confirms: true
        #确认消息已发送到队列(Queue)
        publisher-returns: true

     在生产者方编写一个配置文件RabbitConfig:

    @Configuration
    public class RabbitConfig {
    
        @Bean
        public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate();
            rabbitTemplate.setConnectionFactory(connectionFactory);
            //Mandatory强制的
            //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
            rabbitTemplate.setMandatory(true);
    
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean b, String s) {
                    System.out.println("ConfirmCallback:     " + "相关数据:" + correlationData);
                    System.out.println("ConfirmCallback:     " + "确认情况:" + b);
                    System.out.println("ConfirmCallback:     " + "原因:" + s);
                }
            });
    
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    System.out.println("ReturnCallback:     " + "消息:" + message);
                    System.out.println("ReturnCallback:     " + "回应码:" + replyCode);
                    System.out.println("ReturnCallback:     " + "回应信息:" + replyText);
                    System.out.println("ReturnCallback:     " + "交换机:" + exchange);
                    System.out.println("ReturnCallback:     " + "路由键:" + routingKey);
                }
            });
            return rabbitTemplate;
        }
    }

    生产者消息确认无非就四种情况:先把结论也写了出来

    ①消息推送到server,但是在server里找不到交换机 --》 触发confirm回调失败
    ②消息推送到server,找到交换机了,但是没找到队列 --》 先触发confirm回调返回true,然后再触发return返回失败
    ③消息推送到sever,交换机和队列啥都没找到 --》 触发confirm回调失败
    ④消息推送成功

    ①消息推送到server,但是在server里找不到交换机
    写个测试接口,把消息推送到名为‘non-existent-exchange’的交换机上(这个交换机是没有创建没有配置的)
     @GetMapping("/TestMessageAck")
        public String TestMessageAck() {
            String messageId = String.valueOf(UUID.randomUUID());
            String messageData = "message: non-existent-exchange test message ";
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            Map<String, Object> map = new HashMap<>();
            map.put("messageId", messageId);
            map.put("messageData", messageData);
            map.put("createTime", createTime);
            rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", map);
            return "ok";
        }

    调用接口,查看rabbitmq-provuder项目的控制台输出情况(原因里面有说,没有找到交换机'non-existent-exchange'):

    2019-09-04 09:37:45.197 ERROR 8172 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost 'JCcccHost', class-id=60, method-id=40)
    ConfirmCallback:     相关数据:null
    ConfirmCallback:     确认情况:false
    ConfirmCallback:     原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost 'JCcccHost', class-id=60, method-id=40)

    结论: ①这种情况触发的是 ConfirmCallback 回调函数。

    ②消息推送到server,找到交换机了,但是没找到队列  
    这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,我来简单地在DirectRabitConfig里面新增一个直连交换机,名叫‘lonelyDirectExchange’,但没给它做任何绑定配置操作

    @Bean
    DirectExchange lonelyDirectExchange() {
      return new DirectExchange("lonelyDirectExchange");
    }

    然后写个测试接口,把消息推送到名为‘lonelyDirectExchange’的交换机上(这个交换机是没有任何队列配置的):

    @GetMapping("/TestMessageAck2")
    public String TestMessageAck2() {
      String messageId = String.valueOf(UUID.randomUUID());
      String messageData = "message: lonelyDirectExchange test message ";
      String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
      Map<String, Object> map = new HashMap<>();
      map.put("messageId", messageId);
      map.put("messageData", messageData);
      map.put("createTime", createTime);
      rabbitTemplate.convertAndSend("lonelyDirectExchange", "DirectRouting", map);
      return "ok";
    }

    调用接口,查看rabbitmq-provuder项目的控制台输出情况:

    ReturnCallback: 消息:(Body:'{createTime=2019-09-04 09:48:01, messageId=563077d9-0a77-4c27-8794-ecfb183eac80, messageData=message: lonelyDirectExchange test message }' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
    ReturnCallback: 回应码:312
    ReturnCallback: 回应信息:NO_ROUTE
    ReturnCallback: 交换机:lonelyDirectExchange
    ReturnCallback: 路由键:TestDirectRouting
    ConfirmCallback: 相关数据:null
    ConfirmCallback: 确认情况:true
    ConfirmCallback: 原因:null

    可以看到这种情况,两个函数都被调用了;
    这种情况下,消息是推送成功到服务器了的,所以ConfirmCallback对消息确认情况是true;
    而在RetrunCallback回调函数的打印参数里面可以看到,消息是推送到了交换机成功了,但是在路由分发给队列的时候,找不到队列,所以报了错误 NO_ROUTE 。
      结论:②这种情况触发的是 ConfirmCallback和RetrunCallback两个回调函数。

     

    ③消息推送到sever,交换机和队列啥都没找到 
    这种情况其实一看就觉得跟①很像,没错 ,③和①情况回调是一致的,所以不做结果说明了。
      结论: ③这种情况触发的是 ConfirmCallback 回调函数。

     

     ④消息推送成功
    那么测试下,按照正常调用之前消息推送的接口就行,就调用下 /sendFanoutMessage接口,可以看到控制台输出:

    ConfirmCallback: 相关数据:null
    ConfirmCallback: 确认情况:true
    ConfirmCallback: 原因:null

    结论: ④这种情况触发的是 ConfirmCallback 回调函数。

    以上是生产者推送消息的消息确认 回调函数的使用介绍(可以在回调函数根据需求做对应的扩展或者业务数据处理)。

    下面我们继续说一说消费者的消息确认机制:
    把消费者自动确认消息改为手动确认:
    server:
      port: 8888
    spring:
      rabbitmq:
        host: 192.168.1.149
        username: qjwl
        password: 123456
        virtual-host: /qjwl
        template:
          retry:
            #enabled:开启失败重试
            enabled: true
            #第一次重试的间隔时长
            initial-interval: 10000ms
            #最长重试间隔,超过这个间隔将不再重试
            max-interval: 300000ms
            #下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍
            multiplier: 2
        #确认消息已发送到交换机(Exchange)
        publisher-confirms: true
        #确认消息已发送到队列(Queue)
        publisher-returns: true
    #   手动确认消息(默认是自动确认):
        listener:
          simple:
            acknowledge-mode: manual
    
    
    还是那个在消费者方编写监听器:
    @Component
    public class DirectListener {
    /**
    * 手动确认机制
    */
    @RabbitListener(queues = {"directQueue111"})
    public void userInsert(Map map, Channel channel, Message message) throws IOException {

    try {
         //Message中是rabbitmq管道中的所有信息,包括了消息体,队列名称,交换机名称,唯一标志,状态码等基本信息
    System.out.println(message);
         
        //这个map就是到接收的消息

    System.out.println(map.toString());

        //同一时刻服务器只会发一条消息给消费者(能者多劳模式)

    //channel.basicQos(1);

    /**
    * deliveryTag:该消息的index
    * multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
    */
    //确认签收
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
    //签收失败
    /**
    * deliveryTag:该消息的index
    * multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
    * requeue:被拒绝的是否重新入队列
    */
    //如果重新放进队列中还是会放在队列头部,继续消费者消费,如果一直消费一直错误就会产生堆积问题,理性使用
    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }
    }

    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
    deliveryTag:该消息的index
    requeue:被拒绝的是否重新入队列


    channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息


    如果不手动确认,也不抛出异常,消息不会自动重新推送(包括其他消费者),因为对于rabbitmq来说始终没有接收到消息消费是否成功的确认,并且Channel是在消费端有缓存的,没有断开连接。
    如果rabbitmq断开,连接后会自动重新推送。如果消费端应用重启,消息会自动重新推送。

    东西呢就是这么个东西,大部分都是跟另外一篇博客一致的,只有消费者消息确认这一块差异多点。over
  • 相关阅读:
    javascript:void(0) 真正含义
    Memcache and Mongodb
    window下redis nosql初试
    CAS单点登录配置
    代理模式-你不并知道我的存在
    Netty In Action中文版
    【FastDev4Android框架开发】打造QQ6.X最新版本号側滑界面效果(三十八)
    同步并发操作之等待一次性事件
    关于Java特种兵下冊
    自己定义UISlider的样式和滑块
  • 原文地址:https://www.cnblogs.com/sun2020/p/12803830.html
Copyright © 2011-2022 走看看