zoukankan      html  css  js  c++  java
  • rabbitmq的延迟消息队列实现

    第一部分:延迟消息的实现原理和知识点

    使用RabbitMQ来实现延迟任务必须先了解RabbitMQ的两个概念:消息的TTL和死信Exchange,通过这两者的组合来实现上述需求。

    消息的TTL(Time To Live)

    消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。

    可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。只是expiration字段是字符串参数,所以要写个int类型的字符串:

    当上面的消息扔到队列中后,过了3分钟,如果没有被消费,它就死了。不会被消费者消费到。这个消息后面的,没有“死掉”的消息对顶上来,被消费者消费。死信在队列中并不会被删除和释放,它会被统计到队列的消息数中去。单靠死信还不能实现延迟任务,还要靠Dead Letter Exchange。

    Dead Letter Exchanges

    Exchage的概念在这里就不在赘述。一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。

    1. 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。

    2. 上面的消息的TTL到了,消息过期了。

    3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

    Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

    实现延迟队列

    延迟任务通过消息的TTL和Dead Letter Exchange来实现。我们需要建立2个队列,一个用于发送消息,一个用于消息过期后的转发目标队列。

    生产者输出消息到Queue1,并且这个消息是设置有有效时间的,比如3分钟。消息会在Queue1中等待3分钟,如果没有消费者收掉的话,它就是被转发到Queue2,Queue2有消费者,收到,处理延迟任务。

    完成延迟任务的实现。

     第二部分:具体实现例子

    1、新建立消息队列配置文件rabbitmq.properties

     1 #rabbitmq消息队列的属性配置文件properties
     2 rabbitmq.study.host=192.168.56.101
     3 rabbitmq.study.username=duanml
     4 rabbitmq.study.password=1qaz@WSX
     5 rabbitmq.study.port=5672
     6 rabbitmq.study.vhost=studymq
     7 
     8 #Mail 消息队列的相关变量值
     9 mail.exchange=mailExchange
    10 mail.exchange.key=mail_queue_key
    11 
    12 
    13 #Phone 消息队列的相关变量值
    14 phone.topic.key=phone.one
    15 phone.topic.key.more=phone.one.more
    16 
    17 #delay 延迟消息队列的相关变量值
    18 delay.directQueue.key=TradePayNotify_delay_2m
    19 delay.directMessage.key=TradePayNotify_delay_3m

    2、新建立配置文件,申明延迟队列相关的配置信息如:spring-rabbigmq-dlx.xml

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <beans xmlns="http://www.springframework.org/schema/beans"
     3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4        xmlns:rabbit="http://www.springframework.org/schema/rabbit"
     5        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
     6 
     7     <!--利用rabbitmq的TTL和延迟队列,实现延迟通知任务的例子
     8         1、申明了一个订单通知服务的队列  queue_Notify
     9         2、申明了一个延迟队列Notify_delay_15s,给整个队列设置消息过期时间 为15秒  ——————》 queue ttl  例子
    10         3、申明了一个延迟队列Notify_delay_30s  给发送到这个队列的消息,消息本身设置过期时间 ————————》  message ttl  例子
    11         4、当消息发送到2、3队列的时候,达到了过期时间,即转发到订单通知服务工作队列 1、
    12         5、给队列1 配置消费者服务工作监听,即可完成延迟任务的结果。
    13     -->
    14 
    15     <!-- ################ 订单通知服务消费者配置 ################ -->
    16     <!--队列声明-->
    17     <rabbit:queue id="queue_Notify" name="queue_Notify" durable="true" auto-delete="false" exclusive="false"/>
    18 
    19     <!-- 订单通知服务消费者 exchange -->
    20     <rabbit:direct-exchange name="trade_direct" durable="true" auto-delete="false">
    21         <rabbit:bindings>
    22             <rabbit:binding queue="queue_Notify" key="TradePayNotify"/>
    23         </rabbit:bindings>
    24     </rabbit:direct-exchange>
    25 
    26     <!-- 订单通知监听处理器 -->
    27     <bean id="notifyConsumerListener" class="org.seckill.rabbitmqListener.notify.NotifyConsumerListener"/>
    28     <!--订单消息队列确认回调-->
    29     <bean id="notifyConfirmCallBackListener" class="org.seckill.rabbitmqListener.notify.NotifyConfirmCallBackListener"></bean>
    30     <!--订单消息队列消息发送失败回调-->
    31     <bean id="notifyFailedCallBackListener" class="org.seckill.rabbitmqListener.notify.NotifyFailedCallBackListener"></bean>
    32 
    33     <!-- 监听器acknowledge=manual表示手工确认消息已处理(异常时可以不确认消息),auto表示自动确认(只要不抛出异常,消息就会被消费) -->
    34     <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
    35         <rabbit:listener queues="queue_Notify" ref="notifyConsumerListener"/>
    36     </rabbit:listener-container>
    37 
    38     <!--*****************************************分割线*********************************************************-->
    39 
    40     <!-- ################ 延迟队列生产者配置 ################ -->
    41     <rabbit:template id="rabbitTemplateDelay" mandatory="true" exchange="trade_direct_delay"
    42                      connection-factory="connectionFactory"
    43                      confirm-callback="notifyConfirmCallBackListener"
    44                      return-callback="notifyFailedCallBackListener"
    45                      message-converter="jsonMessageConverter"/>
    46 
    47     <!--配置生产消息的延迟队列操作主体类-->
    48     <bean id="delayMQProducerImpl" class="org.seckill.utils.rabbitmq.Impl.MQProducerImpl">
    49         <property name="rabbitTemplate" ref="rabbitTemplateDelay"></property>
    50     </bean>
    51 
    52     <!--申明一个延迟队列,给整个队列的消息设置消息过期时间 x-message-ttl 2分钟
    53         当消息达到过期时间的时候,rabbitmq将会把消息重新定位转发到其它的队列中去,本例子转发到
    54         exchange:trade_direct
    55         routing-key:TradePayNotify
    56         满足如上两点的队列中去即为:queue_Notify
    57     -->
    58     <rabbit:queue id="Notify_delay_2m" name="Notify_delay_2m" durable="true" auto-delete="false"
    59                   exclusive="false">
    60         <rabbit:queue-arguments>
    61             <entry key="x-message-ttl" value="120000" value-type="java.lang.Long"/>
    62             <entry key="x-dead-letter-exchange" value="trade_direct"/>
    63             <entry key="x-dead-letter-routing-key" value="TradePayNotify"/>
    64         </rabbit:queue-arguments>
    65     </rabbit:queue>
    66 
    67     <!--申明一个延迟队列,在发送消息的时候给消息设置过期时间 3分钟
    68            当消息达到过期时间的时候,rabbitmq将会把消息重新定位转发到其它的队列中去,本例子转发到
    69            exchange:trade_direct
    70            routing-key:TradePayNotify
    71            满足如上两点的队列中去即为:queue_Notify
    72     -->
    73     <rabbit:queue id="Notify_delay_3m" name="Notify_delay_3m" durable="true" auto-delete="false"
    74                   exclusive="false">
    75         <rabbit:queue-arguments>
    76             <entry key="x-dead-letter-exchange" value="trade_direct"/>
    77             <entry key="x-dead-letter-routing-key" value="TradePayNotify"/>
    78         </rabbit:queue-arguments>
    79     </rabbit:queue>
    80 
    81     <!-- 延迟队列工作的 exchange -->
    82     <rabbit:direct-exchange name="trade_direct_delay" durable="true" auto-delete="false">
    83         <rabbit:bindings>
    84             <rabbit:binding queue="Notify_delay_2m" key="TradePayNotify_delay_2m"/>
    85             <rabbit:binding queue="Notify_delay_3m" key="TradePayNotify_delay_3m"/>
    86         </rabbit:bindings>
    87     </rabbit:direct-exchange>
    88 
    89 </beans>

    3、新建立延迟队列测试Controller

      1 package org.seckill.web;
      2 
      3 import org.seckill.dto.SeckillResult;
      4 import org.seckill.entity.Seckill;
      5 import org.seckill.utils.rabbitmq.Impl.MQProducerImpl;
      6 import org.seckill.utils.rabbitmq.MQProducer;
      7 import org.slf4j.Logger;
      8 import org.slf4j.LoggerFactory;
      9 import org.springframework.amqp.core.Message;
     10 import org.springframework.beans.factory.annotation.Autowired;
     11 import org.springframework.beans.factory.annotation.Value;
     12 import org.springframework.stereotype.Controller;
     13 import org.springframework.web.bind.annotation.RequestMapping;
     14 import org.springframework.web.bind.annotation.ResponseBody;
     15 
     16 import java.util.Date;
     17 
     18 /**
     19  * <p>Title: org.seckill.web</p>
     20  * <p>Company:东软集团(neusoft)</p>
     21  * <p>Copyright:Copyright(c)2018</p>
     22  * User: 段美林
     23  * Date: 2018/5/30 17:33
     24  * Description: 消息队列测试
     25  */
     26 @Controller
     27 @RequestMapping("/rabbitmq")
     28 public class RabbitmqController {
     29 
     30     private final Logger logger = LoggerFactory.getLogger(this.getClass());
     31  40 
     41     @Value("${delay.directQueue.key}")
     42     private String delay_directQueue_key;
     43 
     44     @Value("${delay.directMessage.key}")
     45     private String delay_directMessage_key;
     46  52 
     53     @Autowired
     54     private MQProducerImpl delayMQProducerImpl;111 
    112     /**
    113      * @Description: 消息队列
    114      * @Author:
    115      * @CreateTime:
    116      */
    117     @ResponseBody
    118     @RequestMapping("/sendDelayQueue")
    119     public SeckillResult<Long> testDelayQueue() {
    120         SeckillResult<Long> result = null;
    121         Date now = new Date();
    122         try {
    123             Seckill seckill = new Seckill();
    124        //第一种情况,给队列设置消息ttl,详情见配置文件
    125             for (int i = 0; i < 2; i++) {
    126                 seckill.setSeckillId(1922339387 + i);
    127                 seckill.setName("delay_queue_ttl_" + i);
    128                 String msgId = delayMQProducerImpl.getMsgId();
    129                 Message message = delayMQProducerImpl.messageBuil(seckill,msgId);
    130                 delayMQProducerImpl.sendDataToRabbitMQ(delay_directQueue_key, message);
    131             }
    132         //第二种情况,给消息设置ttl
    133             for (int i = 0; i < 2; i++) {
    134                 seckill.setSeckillId(1922339287 + i);
    135                 seckill.setName("delay_message_ttl_" + i);
    136                 String msgId = delayMQProducerImpl.getMsgId();
    137                 Message message = delayMQProducerImpl.messageBuil(seckill,msgId);
    138                 if (message != null) {
    139                     //给消息设置过期时间ttl,为3分钟
    140                     message.getMessageProperties().setExpiration("180000");
    141                     delayMQProducerImpl.sendDataToRabbitMQ(delay_directMessage_key, message);
    142                 }
    143             }
    144             result = new SeckillResult<Long>(true, now.getTime());
    145         } catch (Exception e) {
    146             logger.error(e.getMessage(), e);
    147         }
    148         return result;
    149     }
    150 
    151 }

    4、编写延迟消息确认类和监听类:

    NotifyConfirmCallBackListener.java
     1 package org.seckill.rabbitmqListener.notify;
     2 
     3 import org.slf4j.Logger;
     4 import org.slf4j.LoggerFactory;
     5 import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
     6 import org.springframework.amqp.rabbit.support.CorrelationData;
     7 
     8 /**
     9  * <p>Title: org.seckill.rabbitmqListener.notify</p>
    10  * <p>Company:东软集团(neusoft)</p>
    11  * <p>Copyright:Copyright(c)2018</p>
    12  * User: 段美林
    13  * Date: 2018/6/3 0:27
    14  * Description: 延迟任务测试--->消息确认回调类
    15  */
    16 public class NotifyConfirmCallBackListener implements ConfirmCallback {
    17 
    18     private final Logger logger = LoggerFactory.getLogger(this.getClass());
    19 
    20     /**
    21      * Confirmation callback.
    22      *
    23      * @param correlationData correlation data for the callback.
    24      * @param ack             true for ack, false for nack
    25      * @param cause           An optional cause, for nack, when available, otherwise null.
    26      */
    27     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    28         logger.info("延迟测试---确认消息完成-------->confirm--:correlationData:" + correlationData.getId() + ",ack:" + ack + ",cause:" + cause);
    29     }
    30 }
    NotifyConsumerListener.java
     1 package org.seckill.rabbitmqListener.notify;
     2 
     3 import com.alibaba.fastjson.JSONObject;
     4 import com.rabbitmq.client.Channel;
     5 import org.slf4j.Logger;
     6 import org.slf4j.LoggerFactory;
     7 import org.springframework.amqp.core.Message;
     8 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
     9 
    10 /**
    11  * <p>Title: org.seckill.rabbitmqListener.notify</p>
    12  * <p>Company:东软集团(neusoft)</p>
    13  * <p>Copyright:Copyright(c)2018</p>
    14  * User: 段美林
    15  * Date: 2018/6/3 0:27
    16  * Description: 订单通知队列监听服务
    17  * 实现延迟任务的功能
    18  */
    19 public class NotifyConsumerListener implements ChannelAwareMessageListener {
    20 
    21 
    22     private final Logger logger = LoggerFactory.getLogger(this.getClass());
    23 
    24     /**
    25      * Callback for processing a received Rabbit message.
    26      * <p>Implementors are supposed to process the given Message,
    27      * typically sending reply messages through the given Session.
    28      *
    29      * @param message the received AMQP message (never <code>null</code>)
    30      * @param channel the underlying Rabbit Channel (never <code>null</code>)
    31      * @throws Exception Any.
    32      */
    33     public void onMessage(Message message, Channel channel) throws Exception {
    34         try {
    35             //将字节流对象转换成Java对象
    36 //            Seckill seckill=(Seckill) new ObjectInputStream(new ByteArrayInputStream(message.getBody())).readObject();
    37 
    38             String returnStr = new String(message.getBody(),"UTF-8");
    39             JSONObject jsStr = JSONObject.parseObject(returnStr);
    40 
    41             logger.info("延迟测试--消费开始:名称为--===>" + jsStr.getString("name") + "----->返回消息:" + returnStr + "||||消息的Properties:--》" + message.getMessageProperties());
    42 
    43             //TODO 进行相关业务操作
    44 
    45             //成功处理业务,那么返回消息确认机制,这个消息成功处理OK
    46             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    47 
    48         } catch (Exception e) {
    49             if (message.getMessageProperties().getRedelivered()) {
    50                 //消息已经进行过一次轮询操作,还是失败,将拒绝再次接收本消息
    51                 logger.info("消息已重复处理失败,拒绝再次接收...");
    52                 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息
    53 
    54                 //TODO 进行相关业务操作
    55 
    56             } else {
    57                 //消息第一次接收处理失败后,将再此回到队列中进行  再一次轮询操作
    58                 logger.info("消息即将再次返回队列处理...");
    59                 //处理失败,那么返回消息确认机制,这个消息没有成功处理,返回到队列中
    60                 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    61             }
    62         }
    63     }
    64 }
    NotifyFailedCallBackListener.java
     1 package org.seckill.rabbitmqListener.notify;
     2 
     3 import org.slf4j.Logger;
     4 import org.slf4j.LoggerFactory;
     5 import org.springframework.amqp.core.Message;
     6 import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
     7 
     8 /**
     9  * <p>Title: org.seckill.rabbitmqListener.notify</p>
    10  * <p>Company:东软集团(neusoft)</p>
    11  * <p>Copyright:Copyright(c)2018</p>
    12  * User: 段美林
    13  * Date: 2018/6/3 0:28
    14  * Description: 延迟任务测试----> 消息发送失败回调类
    15  */
    16 public class NotifyFailedCallBackListener implements ReturnCallback {
    17 
    18     private final Logger logger = LoggerFactory.getLogger(this.getClass());
    19 
    20     /**
    21      * Returned message callback.
    22      *
    23      * @param message    the returned message.
    24      * @param replyCode  the reply code.
    25      * @param replyText  the reply text.
    26      * @param exchange   the exchange.
    27      * @param routingKey the routing key.
    28      */
    29     public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    30         logger.info("延迟测试------------->return--message:" +
    31                 new String(message.getBody()) +
    32                 ",replyCode:" + replyCode + ",replyText:" + replyText +
    33                 ",exchange:" + exchange + ",routingKey:" + routingKey);
    34     }
    35 }

    5、编写消息队列的操作类和接口:

    MQProducer.java
      1 package org.seckill.utils.rabbitmq;
      2 
      3 import org.springframework.amqp.core.Message;
      4 import org.springframework.amqp.core.MessagePostProcessor;
      5 import org.springframework.amqp.rabbit.support.CorrelationData;
      6 
      7 /**
      8  * <p>Title: org.seckill.utils.rabbitmq</p>
      9  * <p>Company:东软集团(neusoft)</p>
     10  * <p>Copyright:Copyright(c)2018</p>
     11  * User: 段美林
     12  * Date: 2018/5/30 11:49
     13  * Description: No Description
     14  */
     15 public interface MQProducer {
     16 
     17     /**
     18      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
     19      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
     20      * @param message
     21      */
     22     void sendDataToRabbitMQ(java.lang.Object message);
     23 
     24     /**
     25      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
     26      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
     27      * @param message
     28      * @param messagePostProcessor
     29      */
     30     void sendDataToRabbitMQ(java.lang.Object message, MessagePostProcessor messagePostProcessor);
     31 
     32     /**
     33      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
     34      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
     35      * @param message
     36      * @param messagePostProcessor
     37      * @param correlationData
     38      */
     39     void sendDataToRabbitMQ(java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData);
     40 
     41     /**
     42      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
     43      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
     44      * @param routingKey
     45      * @param message
     46      */
     47     void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message);
     48 
     49     /**
     50      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
     51      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
     52      * @param routingKey
     53      * @param message
     54      * @param correlationData
     55      */
     56     void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, CorrelationData correlationData);
     57 
     58     /**
     59      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
     60      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
     61      * @param routingKey
     62      * @param message
     63      * @param messagePostProcessor
     64      */
     65     void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor);
     66 
     67     /**
     68      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
     69      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
     70      * @param routingKey
     71      * @param message
     72      * @param messagePostProcessor
     73      * @param correlationData
     74      */
     75     void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData);
     76 
     77     /**
     78      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
     79      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
     80      * @param exchange
     81      * @param routingKey
     82      * @param message
     83      */
     84     void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message);
     85 
     86     /**
     87      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
     88      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
     89      * @param exchange
     90      * @param routingKey
     91      * @param message
     92      * @param correlationData
     93      */
     94     void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, CorrelationData correlationData);
     95 
     96     /**
     97      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
     98      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
     99      * @param exchange
    100      * @param routingKey
    101      * @param message
    102      * @param messagePostProcessor
    103      */
    104     void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor);
    105 
    106     /**
    107      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
    108      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
    109      * @param exchange
    110      * @param routingKey
    111      * @param message
    112      * @param messagePostProcessor
    113      * @param correlationData
    114      */
    115     void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData);
    116 
    117     Message messageBuil(Object handleObject, String msgId);
    118 
    119     String getMsgId();
    120 }
    MQProducerImpl.java
      1 package org.seckill.utils.rabbitmq.Impl;
      2 
      3 import com.alibaba.fastjson.JSONObject;
      4 import org.seckill.utils.rabbitmq.MQProducer;
      5 import org.slf4j.Logger;
      6 import org.slf4j.LoggerFactory;
      7 import org.springframework.amqp.AmqpException;
      8 import org.springframework.amqp.core.Message;
      9 import org.springframework.amqp.core.MessageBuilder;
     10 import org.springframework.amqp.core.MessagePostProcessor;
     11 import org.springframework.amqp.core.MessageProperties;
     12 import org.springframework.amqp.rabbit.core.RabbitTemplate;
     13 import org.springframework.amqp.rabbit.support.CorrelationData;
     14 import org.springframework.stereotype.Component;
     15 
     16 import java.io.UnsupportedEncodingException;
     17 import java.util.UUID;
     18 
     19 /**
     20  * <p>Title: org.seckill.utils.rabbitmq.Impl</p>
     21  * <p>Company:东软集团(neusoft)</p>
     22  * <p>Copyright:Copyright(c)2018</p>
     23  * User: 段美林
     24  * Date: 2018/6/2 22:54
     25  * Description: 消息生产者操作主体类
     26  */
     27 @Component
     28 public class MQProducerImpl implements MQProducer{
     29 
     30     private static final Logger logger = LoggerFactory.getLogger(MQProducerImpl.class);
     31 
     32     private RabbitTemplate rabbitTemplate;
     33 
     34     /**
     35      * Sets the rabbitTemplate.
     36      * <p>
     37      * <p>You can use getRabbitTemplate() to get the value of rabbitTemplate</p>
     38      *
     39      * @param rabbitTemplate rabbitTemplate
     40      */
     41     public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
     42         this.rabbitTemplate = rabbitTemplate;
     43     }
     44 
     45     /**
     46      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
     47      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
     48      *
     49      * @param message
     50      */
     51     public void sendDataToRabbitMQ(Object message) {
     52         try {
     53             if (message instanceof Message){
     54                 Message messageSend = (Message) message;
     55                 String msgId = messageSend.getMessageProperties().getCorrelationId();
     56                 CorrelationData correlationData = new CorrelationData(msgId);
     57                 rabbitTemplate.convertAndSend(rabbitTemplate.getRoutingKey(),message,correlationData);
     58             }else {
     59                 rabbitTemplate.convertAndSend(message);
     60             }
     61         } catch (AmqpException e) {
     62             logger.error(e.getMessage(), e);
     63         }
     64     }
     65 
     66     /**
     67      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
     68      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
     69      *
     70      * @param message
     71      * @param messagePostProcessor
     72      */
     73     public void sendDataToRabbitMQ(Object message, MessagePostProcessor messagePostProcessor) {
     74         try {
     75             if (message instanceof Message){
     76                 Message messageSend = (Message) message;
     77                 String msgId = messageSend.getMessageProperties().getCorrelationId();
     78                 CorrelationData correlationData = new CorrelationData(msgId);
     79                 rabbitTemplate.convertAndSend(rabbitTemplate.getRoutingKey(),message,messagePostProcessor,correlationData);
     80             }else {
     81                 rabbitTemplate.convertAndSend(message, messagePostProcessor);
     82             }
     83         } catch (AmqpException e) {
     84             logger.error(e.getMessage(), e);
     85         }
     86     }
     87 
     88     /**
     89      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
     90      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
     91      *
     92      * @param message
     93      * @param messagePostProcessor
     94      * @param correlationData
     95      */
     96     public void sendDataToRabbitMQ(Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) {
     97         try {
     98             rabbitTemplate.convertAndSend(message, messagePostProcessor, correlationData);
     99         } catch (AmqpException e) {
    100             logger.error(e.getMessage(), e);
    101         }
    102     }
    103 
    104     /**
    105      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
    106      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
    107      *
    108      * @param routingKey
    109      * @param message
    110      */
    111     public void sendDataToRabbitMQ(String routingKey, Object message) {
    112         try {
    113             if (message instanceof Message){
    114                 Message messageSend = (Message) message;
    115                 String msgId = messageSend.getMessageProperties().getCorrelationId();
    116                 CorrelationData correlationData = new CorrelationData(msgId);
    117                 rabbitTemplate.convertAndSend(routingKey,message,correlationData);
    118             }else {
    119                 rabbitTemplate.convertAndSend(routingKey, message);
    120             }
    121         } catch (AmqpException e) {
    122             logger.error(e.getMessage(), e);
    123         }
    124     }
    125 
    126     /**
    127      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
    128      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
    129      *
    130      * @param routingKey
    131      * @param message
    132      * @param correlationData
    133      */
    134     public void sendDataToRabbitMQ(String routingKey, Object message, CorrelationData correlationData) {
    135         try {
    136             rabbitTemplate.convertAndSend(routingKey, message, correlationData);
    137         } catch (AmqpException e) {
    138             logger.error(e.getMessage(), e);
    139         }
    140     }
    141 
    142     /**
    143      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
    144      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
    145      *
    146      * @param routingKey
    147      * @param message
    148      * @param messagePostProcessor
    149      */
    150     public void sendDataToRabbitMQ(String routingKey, Object message, MessagePostProcessor messagePostProcessor) {
    151         try {
    152             if (message instanceof Message){
    153                 Message messageSend = (Message) message;
    154                 String msgId = messageSend.getMessageProperties().getCorrelationId();
    155                 CorrelationData correlationData = new CorrelationData(msgId);
    156                 rabbitTemplate.convertAndSend(routingKey,message,messagePostProcessor,correlationData);
    157             }else {
    158                 rabbitTemplate.convertAndSend(routingKey, message, messagePostProcessor);
    159             }
    160         } catch (AmqpException e) {
    161             logger.error(e.getMessage(), e);
    162         }
    163     }
    164 
    165     /**
    166      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
    167      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
    168      *
    169      * @param routingKey
    170      * @param message
    171      * @param messagePostProcessor
    172      * @param correlationData
    173      */
    174     public void sendDataToRabbitMQ(String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) {
    175         try {
    176             rabbitTemplate.convertAndSend(routingKey, message, messagePostProcessor, correlationData);
    177         } catch (AmqpException e) {
    178             logger.error(e.getMessage(), e);
    179         }
    180     }
    181 
    182     /**
    183      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
    184      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
    185      *
    186      * @param exchange
    187      * @param routingKey
    188      * @param message
    189      */
    190     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message) {
    191         try {
    192             if (message instanceof Message){
    193                 Message messageSend = (Message) message;
    194                 String msgId = messageSend.getMessageProperties().getCorrelationId();
    195                 CorrelationData correlationData = new CorrelationData(msgId);
    196                 rabbitTemplate.convertAndSend(routingKey,message,correlationData);
    197             }else {
    198                 rabbitTemplate.convertAndSend(exchange, routingKey, message);
    199             }
    200         } catch (AmqpException e) {
    201             logger.error(e.getMessage(), e);
    202         }
    203     }
    204 
    205     /**
    206      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
    207      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
    208      *
    209      * @param exchange
    210      * @param routingKey
    211      * @param message
    212      * @param correlationData
    213      */
    214     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, CorrelationData correlationData) {
    215         try {
    216             rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
    217         } catch (AmqpException e) {
    218             logger.error(e.getMessage(), e);
    219         }
    220     }
    221 
    222     /**
    223      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
    224      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
    225      *
    226      * @param exchange
    227      * @param routingKey
    228      * @param message
    229      * @param messagePostProcessor
    230      */
    231     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) {
    232         try {
    233             rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor);
    234         } catch (AmqpException e) {
    235             logger.error(e.getMessage(), e);
    236         }
    237     }
    238 
    239     /**
    240      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
    241      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
    242      *
    243      * @param exchange
    244      * @param routingKey
    245      * @param message
    246      * @param messagePostProcessor
    247      * @param correlationData
    248      */
    249     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) {
    250         try {
    251             rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData);
    252         } catch (AmqpException e) {
    253             logger.error(e.getMessage(), e);
    254         }
    255     }
    256 
    257     /**
    258      * 构建Message对象,进行消息发送
    259      * @param handleObject
    260      * @param msgId
    261      * @return
    262      */
    263     public Message messageBuil(Object handleObject, String msgId) {
    264         try {
    265             //先转成JSON
    266             String objectJSON = JSONObject.toJSONString(handleObject);
    267             //再构建Message对象
    268             Message messageBuil = MessageBuilder.withBody(objectJSON.getBytes("UTF-8")).setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    269                     .setCorrelationId(msgId).build();
    270             return messageBuil;
    271         } catch (UnsupportedEncodingException e) {
    272             logger.error("构建Message出错:" + e.getMessage(),e);
    273             return null;
    274         }
    275     }
    276 
    277     /**
    278      * 生成唯一的消息操作id
    279      * @return
    280      */
    281     public String getMsgId() {
    282         return UUID.randomUUID().toString();
    283     }
    284 
    285 }

    至此就完成了延迟消息队列的所有代码实现,

  • 相关阅读:
    webpack源码学习总结
    并发容器(三)非阻塞队列的并发容器
    并发容器(二)阻塞队列详细介绍
    并发容器(一)同步容器 与 并发容器
    java内存模型(二)深入理解java内存模型的系列好文
    java内存模型(一)正确使用 Volatile 变量
    原子操作类(二)原子操作的实现原理
    原子操作类(一)原子操作类详细介绍
    同步锁源码分析(一)AbstractQueuedSynchronizer原理
    并发工具类(五) Phaser类
  • 原文地址:https://www.cnblogs.com/yinfengjiujian/p/9204600.html
Copyright © 2011-2022 走看看