zoukankan      html  css  js  c++  java
  • rabbitmq 和Spring 集成 实现(一)

    1、增加pom.xml依赖

     1 <!--rabbitmq消息队列依赖架包-->
     2     <dependency>
     3       <groupId>org.springframework.amqp</groupId>
     4       <artifactId>spring-rabbit</artifactId>
     5       <version>2.0.3.RELEASE</version>
     6     </dependency>
     7 
     8     <!-- rabbitmq消息队列客户端依赖 -->
     9     <dependency>
    10       <groupId>com.rabbitmq</groupId>
    11       <artifactId>amqp-client</artifactId>
    12       <version>5.2.0</version>
    13     </dependency>

    2、在项目的resources目录下新增rabbitmq.properties配置文件

     1 #rabbitmq消息队列的属性配置文件properties
     2 rabbitmq.study.host=192.168.56.101
     3 rabbitmq.study.username=duanml
     4 rabbitmq.study.password=123456
     5 rabbitmq.study.port=5672
     6 rabbitmq.study.vhost=studymq
     7 
     8 #Mail 消息队列的相关变量值
     9 mail.exchange=mailExchange
    10 mail.exchange.key=mail_queue_key
    11 
    12 
    13 #Phone 消息队列的相关变量值
    14 phone.topic.key=phone.one
    15 phone.topic.key.more=phone.one.more

    3、增加spring的配置文件spring-rabbitmq.xml

      1 <?xml version="1.0" encoding="UTF-8"?>
      2 <beans xmlns="http://www.springframework.org/schema/beans"
      3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      4        xmlns:rabbit="http://www.springframework.org/schema/rabbit"
      5        xmlns:context="http://www.springframework.org/schema/context"
      6        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
      7 
      8     <description>rabbitmq消息队列配置</description>
      9 
     10     <!--属性配置properties文件加载-->
     11     <context:property-placeholder location="classpath:rabbitmq.properties"
     12                                   ignore-unresolvable="true"></context:property-placeholder>
     13 
     14     <!-- rabbitmq消息队列连接配置 -->
     15     <rabbit:connection-factory id="connectionFactory"
     16                                host="${rabbitmq.study.host}"
     17                                username="${rabbitmq.study.username}"
     18                                password="${rabbitmq.study.password}"
     19                                port="${rabbitmq.study.port}"
     20                                publisher-confirms="true"/>
     21 
     22     <rabbit:admin connection-factory="connectionFactory"/>
     23     <!-- 消息对象json转换类 -->
     24     <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
     25 
     26 
     27     <!--申明一个消息队列Queue __> 用于发送Mail************************************Start************************-->
     28     <!--
     29     说明:
     30         durable:是否持久化
     31         exclusive: 仅创建者可以使用的私有队列,断开后自动删除
     32         auto_delete: 当所有消费客户端连接断开后,是否自动删除队列
     33     -->
     34     <rabbit:queue id="mailQueue" name="mailQueue" durable="true" auto-delete="false" exclusive="false"/>
     35 
     36     <!--交换机定义
     37         交换机的四种模式:
     38         direct:转发消息到 routigKey 指定的队列。
     39         topic:对 key 进行模式匹配,比如ab*可以传到到所有 ab* 的 queue。
     40         headers:(这个还没有接触到)
     41         fanout:转发消息到所有绑定队列,忽略 routigKey
     42         交换器的属性:
     43         持久性:如果启用,交换器将会在server重启前都有效。
     44         自动删除:如果启用,那么交换器将会在其绑定的队列都被删除掉之后自动删除掉自身。
     45         惰性:如果没有声明交换器,那么在执行到使用的时候会导致异常,并不会主动声明。
     46      -->
     47     <rabbit:direct-exchange id="mailExchange" name="mailExchange" durable="true" auto-delete="false">
     48         <rabbit:bindings>
     49             <rabbit:binding queue="mailQueue" key="mail_queue_key"/>
     50         </rabbit:bindings>
     51     </rabbit:direct-exchange>
     52 
     53     <!-- 消费者 -->
     54     <bean id="mailConsumerListener" class="org.seckill.rabbitmqListener.mail.MailConsumerListener"></bean>
     55     <!--消息确认后回调方法-->
     56     <bean id="mailConfirmCallBackListener" class="org.seckill.rabbitmqListener.mail.MailConfirmCallBackListener"></bean>
     57     <!--消息失败后Return回调-->
     58     <bean id="mailFailedCallBackListener" class="org.seckill.rabbitmqListener.mail.MailFailedCallBackListener"></bean>
     59 
     60     <!-- spring template声明-->
     61     <rabbit:template id="mailAMQPTemplate" exchange="mailExchange" mandatory="true"
     62                      message-converter="jsonMessageConverter"
     63                      confirm-callback="mailConfirmCallBackListener"
     64                      return-callback="mailFailedCallBackListener"
     65                      connection-factory="connectionFactory"/>
     66 
     67     <!-- 配置监听 -->
     68     <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
     69         <!--queues 监听队列,多个用逗号分隔 ref 监听器 -->
     70         <rabbit:listener queues="mailQueue" ref="mailConsumerListener"></rabbit:listener>
     71     </rabbit:listener-container>
     72     <!--申明一个消息队列Queue __> 用于发送Mail************************************End************************-->
     73 
     74 
     75 
     76 
     77     <!--===============================================分割线==================================================-->
     78 
     79 
     80 
     81     <!--申明一个用于发送短信的消息队列__>用于发送短信Phone************************************Start************************-->
     82     <!--
     83     说明:
     84         durable:是否持久化
     85         exclusive: 仅创建者可以使用的私有队列,断开后自动删除
     86         auto_delete: 当所有消费客户端连接断开后,是否自动删除队列
     87     -->
     88     <rabbit:queue id="PhoneQueueOne" name="PhoneQueueOne" durable="true" auto-delete="false" exclusive="false"/>
     89     <rabbit:queue id="PhoneQueueTwo" name="PhoneQueueTwo" durable="true" auto-delete="false" exclusive="false"/>
     90 
     91     <!--交换机定义
     92         交换机的四种模式:
     93         direct:转发消息到 routigKey 指定的队列。
     94         topic:对 key 进行模式匹配,比如ab*可以传到到所有 ab* 的 queue。
     95         headers:(这个还没有接触到)
     96         fanout:转发消息到所有绑定队列,忽略 routigKey
     97         交换器的属性:
     98         持久性:如果启用,交换器将会在server重启前都有效。
     99         自动删除:如果启用,那么交换器将会在其绑定的队列都被删除掉之后自动删除掉自身。
    100         惰性:如果没有声明交换器,那么在执行到使用的时候会导致异常,并不会主动声明。
    101      -->
    102     <rabbit:topic-exchange id="phoneExchange" name="phoneExchange" durable="true" auto-delete="false">
    103         <rabbit:bindings>
    104             <rabbit:binding queue="PhoneQueueOne" pattern="phone.*"></rabbit:binding>
    105             <rabbit:binding queue="PhoneQueueTwo" pattern="phone.#"></rabbit:binding>
    106         </rabbit:bindings>
    107     </rabbit:topic-exchange>
    108 
    109     <!-- 消费者 -->
    110     <bean id="phoneConsumerListener" class="org.seckill.rabbitmqListener.phone.PhoneConsumerListener"></bean>
    111     <bean id="phoneConsumerListenerMore" class="org.seckill.rabbitmqListener.phone.PhoneConsumerListenerMore"></bean>
    112     <!--消息确认后回调方法-->
    113     <bean id="phoneConfirmCallBackListener" class="org.seckill.rabbitmqListener.phone.PhoneConfirmCallBackListener"></bean>
    114     <!--消息失败后Return回调-->
    115     <bean id="phoneFailedCallBackListener" class="org.seckill.rabbitmqListener.phone.PhoneFailedCallBackListener"></bean>
    116 
    117     <!-- spring template声明-->
    118     <rabbit:template id="phoneAMQPTemplate" exchange="phoneExchange" mandatory="true"
    119                      message-converter="jsonMessageConverter"
    120                      confirm-callback="phoneConfirmCallBackListener"
    121                      return-callback="phoneFailedCallBackListener"
    122                      connection-factory="connectionFactory"/>
    123 
    124     <!-- 配置监听 -->
    125     <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
    126         <!--queues 监听队列,多个用逗号分隔 ref 监听器 -->
    127         <rabbit:listener queues="PhoneQueueOne" ref="phoneConsumerListener"></rabbit:listener>
    128         <rabbit:listener queues="PhoneQueueOne,PhoneQueueTwo" ref="phoneConsumerListenerMore"></rabbit:listener>
    129     </rabbit:listener-container>
    130     <!--申明一个用于发送短信的消息队列__>用于发送短信Phone************************************End************************-->
    131 
    132 
    133 </beans>

    注意:

      这里配置了两种常用的交换机:

    direct-exchange                 对应  处理邮件的 Mail 的 消息队列
    队列名称:mailQueue
    操作Mail队列的 生产者使用的 mailAMQPTemplate
    监听Mail队列的消费者 mailConsumerListener
    确认消息处理成功的回调:mailConfirmCallBackListener
    消息发送队列失败回调:mailFailedCallBackListener
    topic-exchange           对应  处理短信发送 Phone 的消息队列
     队列名称:PhoneQueueOne、PhoneQueueTwo
      操作Phone队列的 生产者使用的 phoneAMQPTemplate
    监听Phone队列的消费者 phoneConsumerListener、phoneConsumerListenerMore
    确认消息处理成功的回调:phoneConfirmCallBackListener
    消息发送队列失败回调:phoneFailedCallBackListener
    4、编写实现的具体类
    消息接口类:MQProducer.java
     1 package org.seckill.utils.rabbitmq;
     2 
     3 import org.springframework.amqp.core.MessagePostProcessor;
     4 
     5 /**
     6  * <p>Title: org.seckill.utils.rabbitmq</p>
     7  * <p>Company:东软集团(neusoft)</p>
     8  * <p>Copyright:Copyright(c)2018</p>
     9  * User: 段美林
    10  * Date: 2018/5/30 11:49
    11  * Description: No Description
    12  */
    13 public interface MQProducer {
    14 
    15     /**
    16      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
    17      * @param message
    18      */
    19     public   void sendDataToRabbitMQ(Object message);
    20 
    21     /**
    22      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
    23      * @param message
    24      * @param messagePostProcessor
    25      */
    26     public  void sendDataToRabbitMQ(Object message, MessagePostProcessor messagePostProcessor);
    27 
    28     /**
    29      * 发送消息到指定的队列中
    30      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
    31      * @param routingKey
    32      * @param message
    33      */
    34     public  void sendDataToRabbitMQ(String routingKey, Object message);
    35 
    36     /**
    37      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key
    38      * @param routingKey
    39      * @param message
    40      * @param messagePostProcessor
    41      */
    42     public  void sendDataToRabbitMQ(String routingKey, Object message, MessagePostProcessor messagePostProcessor);
    43 
    44     /**
    45      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
    46      * @param exchange
    47      * @param routingKey
    48      * @param message
    49      */
    50     public  void sendDataToRabbitMQ(String exchange, String routingKey, Object message);
    51 
    52     /**
    53      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
    54      * @param exchange
    55      * @param routingKey
    56      * @param message
    57      * @param messagePostProcessor
    58      */
    59     public  void sendDataToRabbitMQ(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor);
    60 }

    Mail生产者的实现类:MailProducerImpl.java

      1 package org.seckill.utils.rabbitmq.Impl;
      2 
      3 import org.seckill.utils.rabbitmq.MQProducer;
      4 import org.slf4j.Logger;
      5 import org.slf4j.LoggerFactory;
      6 import org.springframework.amqp.AmqpException;
      7 import org.springframework.amqp.core.AmqpTemplate;
      8 import org.springframework.amqp.core.MessagePostProcessor;
      9 import org.springframework.beans.factory.annotation.Autowired;
     10 import org.springframework.stereotype.Component;
     11 
     12 /**
     13  * <p>Title: org.seckill.utils.rabbitmq.Impl</p>
     14  * <p>Company:东软集团(neusoft)</p>
     15  * <p>Copyright:Copyright(c)2018</p>
     16  * User: 段美林
     17  * Date: 2018/5/30 11:50
     18  * Description: 消息生产者的操作类
     19  */
     20 @Component
     21 public class MailProducerImpl implements MQProducer {
     22 
     23     private static final Logger logger = LoggerFactory.getLogger(MailProducerImpl.class);
     24 
     25     @Autowired
     26     private AmqpTemplate mailAMQPTemplate;
     27 
     28     /**
     29      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
     30      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
     31      * @param message
     32      */
     33     public void sendDataToRabbitMQ(Object message) {
     34         try {
     35             mailAMQPTemplate.convertAndSend(message);
     36         } catch (AmqpException e) {
     37             logger.error(e.getMessage(),e);
     38         }
     39     }
     40 
     41     /**
     42      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
     43      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
     44      * @param message
     45      * @param messagePostProcessor
     46      */
     47     public void sendDataToRabbitMQ(Object message, MessagePostProcessor messagePostProcessor) {
     48         try {
     49             mailAMQPTemplate.convertAndSend(message,messagePostProcessor);
     50         } catch (AmqpException e) {
     51             logger.error(e.getMessage(),e);
     52         }
     53     }
     54 
     55     /**
     56      * 发送消息到指定的队列中
     57      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
     58      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
     59      * @param routingKey
     60      * @param message
     61      */
     62     public void sendDataToRabbitMQ(String routingKey, Object message) {
     63         try {
     64             mailAMQPTemplate.convertAndSend(routingKey,message);
     65         } catch (AmqpException e) {
     66             logger.error(e.getMessage(),e);
     67         }
     68     }
     69 
     70     /**
     71      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key
     72      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
     73      * @param routingKey
     74      * @param message
     75      * @param messagePostProcessor
     76      */
     77     public void sendDataToRabbitMQ(String routingKey, Object message, MessagePostProcessor messagePostProcessor) {
     78         try {
     79             mailAMQPTemplate.convertAndSend(routingKey,message,messagePostProcessor);
     80         } catch (AmqpException e) {
     81             logger.error(e.getMessage(),e);
     82         }
     83     }
     84 
     85     /**
     86      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
     87      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
     88      * @param exchange
     89      * @param routingKey
     90      * @param message
     91      */
     92     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message) {
     93         try {
     94             mailAMQPTemplate.convertAndSend(exchange,routingKey,message);
     95         } catch (AmqpException e) {
     96             logger.error(e.getMessage(),e);
     97         }
     98     }
     99 
    100     /**
    101      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
    102      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
    103      * @param exchange
    104      * @param routingKey
    105      * @param message
    106      * @param messagePostProcessor
    107      */
    108     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) {
    109         try {
    110             mailAMQPTemplate.convertAndSend(exchange,routingKey,message,messagePostProcessor);
    111         } catch (AmqpException e) {
    112             logger.error(e.getMessage(),e);
    113         }
    114     }
    115 }

    短信发送队列的生产者实现类:PhoneProducerImpl.java

      1 package org.seckill.utils.rabbitmq.Impl;
      2 
      3 import org.seckill.utils.rabbitmq.MQProducer;
      4 import org.slf4j.Logger;
      5 import org.slf4j.LoggerFactory;
      6 import org.springframework.amqp.AmqpException;
      7 import org.springframework.amqp.core.AmqpTemplate;
      8 import org.springframework.amqp.core.MessagePostProcessor;
      9 import org.springframework.beans.factory.annotation.Autowired;
     10 import org.springframework.stereotype.Component;
     11 
     12 /**
     13  * <p>Title: org.seckill.utils.rabbitmq.Impl</p>
     14  * <p>Company:东软集团(neusoft)</p>
     15  * <p>Copyright:Copyright(c)2018</p>
     16  * User: 段美林
     17  * Date: 2018/5/30 11:50
     18  * Description: 消息生产者的操作类
     19  */
     20 @Component
     21 public class PhoneProducerImpl implements MQProducer {
     22 
     23     private static final Logger logger = LoggerFactory.getLogger(PhoneProducerImpl.class);
     24 
     25     @Autowired
     26     private AmqpTemplate phoneAMQPTemplate;
     27 
     28     /**
     29      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
     30      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
     31      * @param message
     32      */
     33     public void sendDataToRabbitMQ(Object message) {
     34         try {
     35             phoneAMQPTemplate.convertAndSend(message);
     36         } catch (AmqpException e) {
     37             logger.error(e.getMessage(),e);
     38         }
     39     }
     40 
     41     /**
     42      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
     43      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
     44      * @param message
     45      * @param messagePostProcessor
     46      */
     47     public void sendDataToRabbitMQ(Object message, MessagePostProcessor messagePostProcessor) {
     48         try {
     49             phoneAMQPTemplate.convertAndSend(message,messagePostProcessor);
     50         } catch (AmqpException e) {
     51             logger.error(e.getMessage(),e);
     52         }
     53     }
     54 
     55     /**
     56      * 发送消息到指定的队列中
     57      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
     58      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
     59      * @param routingKey
     60      * @param message
     61      */
     62     public void sendDataToRabbitMQ(String routingKey, Object message) {
     63         try {
     64             phoneAMQPTemplate.convertAndSend(routingKey,message);
     65         } catch (AmqpException e) {
     66             logger.error(e.getMessage(),e);
     67         }
     68     }
     69 
     70     /**
     71      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key
     72      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
     73      * @param routingKey
     74      * @param message
     75      * @param messagePostProcessor
     76      */
     77     public void sendDataToRabbitMQ(String routingKey, Object message, MessagePostProcessor messagePostProcessor) {
     78         try {
     79             phoneAMQPTemplate.convertAndSend(routingKey,message,messagePostProcessor);
     80         } catch (AmqpException e) {
     81             logger.error(e.getMessage(),e);
     82         }
     83     }
     84 
     85     /**
     86      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
     87      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
     88      * @param exchange
     89      * @param routingKey
     90      * @param message
     91      */
     92     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message) {
     93         try {
     94             phoneAMQPTemplate.convertAndSend(exchange,routingKey,message);
     95         } catch (AmqpException e) {
     96             logger.error(e.getMessage(),e);
     97         }
     98     }
     99 
    100     /**
    101      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
    102      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
    103      * @param exchange
    104      * @param routingKey
    105      * @param message
    106      * @param messagePostProcessor
    107      */
    108     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) {
    109         try {
    110             phoneAMQPTemplate.convertAndSend(exchange,routingKey,message,messagePostProcessor);
    111         } catch (AmqpException e) {
    112             logger.error(e.getMessage(),e);
    113         }
    114     }
    115 }

    以下为:消费者相关的实现类:=======================================================================

    Mail消息消费者实现类 MailConsumerListener.java

     1 package org.seckill.rabbitmqListener.mail;
     2 
     3 import com.rabbitmq.client.Channel;
     4 import org.slf4j.Logger;
     5 import org.slf4j.LoggerFactory;
     6 import org.springframework.amqp.core.Message;
     7 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
     8 
     9 /**
    10  * <p>Title: org.seckill.rabbitmqListener</p>
    11  * <p>Company:东软集团(neusoft)</p>
    12  * <p>Copyright:Copyright(c)2018</p>
    13  * User: 段美林
    14  * Date: 2018/5/30 17:55
    15  * Description: 消费方实现类
    16  */
    17 public class MailConsumerListener implements ChannelAwareMessageListener {
    18 
    19     private final Logger logger = LoggerFactory.getLogger(this.getClass());
    20 
    21     public void onMessage(Message message, Channel channel) throws Exception {
    22         try{
    23             logger.info("mail------->consumer--:"+message.getMessageProperties()+":"+new String(message.getBody()));
    24             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    25         }catch(Exception e){
    26             logger.error(e.getMessage(),e);
    27             channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
    28         }
    29     }
    30 }

    Mail消息确认实现类:MailConfirmCallBackListener.java

     1 package org.seckill.rabbitmqListener.mail;
     2 
     3 import org.slf4j.Logger;
     4 import org.slf4j.LoggerFactory;
     5 import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
     6 import org.springframework.amqp.rabbit.support.CorrelationData;
     7 
     8 /**
     9  * <p>Title: org.seckill.rabbitmqListener</p>
    10  * <p>Company:东软集团(neusoft)</p>
    11  * <p>Copyright:Copyright(c)2018</p>
    12  * User: 段美林
    13  * Date: 2018/5/30 17:59
    14  * Description: 确认后回调方法
    15  */
    16 public class MailConfirmCallBackListener implements ConfirmCallback {
    17 
    18     private final Logger logger = LoggerFactory.getLogger(this.getClass());
    19 
    20     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    21         logger.info("mail----------->confirm--:correlationData:" + correlationData + ",ack:" + ack + ",cause:" + cause);
    22     }
    23 }

    Mail消息发送失败回调实现类:MailFailedCallBackListener.java

     1 package org.seckill.rabbitmqListener.mail;
     2 
     3 import org.slf4j.Logger;
     4 import org.slf4j.LoggerFactory;
     5 import org.springframework.amqp.core.Message;
     6 import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
     7 
     8 /**
     9  * <p>Title: org.seckill.rabbitmqListener</p>
    10  * <p>Company:东软集团(neusoft)</p>
    11  * <p>Copyright:Copyright(c)2018</p>
    12  * User: 段美林
    13  * Date: 2018/5/30 18:03
    14  * Description: 失败后return回调
    15  */
    16 public class MailFailedCallBackListener implements ReturnCallback {
    17 
    18     private final Logger logger = LoggerFactory.getLogger(this.getClass());
    19 
    20     public void returnedMessage(Message message, int replyCode,
    21                                 String replyText, String exchange,
    22                                 String routingKey) {
    23 
    24         logger.info("Mail------------->return--message:" +
    25                 new String(message.getBody()) +
    26                 ",replyCode:" + replyCode + ",replyText:" + replyText +
    27                 ",exchange:" + exchange + ",routingKey:" + routingKey);
    28     }
    29 }

    5、Controller层的测试类:

    RabbitmqController.java

     1 package org.seckill.web;
     2 
     3 import org.seckill.dto.SeckillResult;
     4 import org.seckill.entity.Seckill;
     5 import org.seckill.utils.rabbitmq.Impl.MailProducerImpl;
     6 import org.seckill.utils.rabbitmq.Impl.PhoneProducerImpl;
     7 import org.slf4j.Logger;
     8 import org.slf4j.LoggerFactory;
     9 import org.springframework.beans.factory.annotation.Autowired;
    10 import org.springframework.beans.factory.annotation.Value;
    11 import org.springframework.stereotype.Controller;
    12 import org.springframework.web.bind.annotation.RequestMapping;
    13 import org.springframework.web.bind.annotation.ResponseBody;
    14 
    15 import java.util.Date;
    16 import java.util.HashMap;
    17 import java.util.Map;
    18 
    19 /**
    20  * <p>Title: org.seckill.web</p>
    21  * <p>Company:东软集团(neusoft)</p>
    22  * <p>Copyright:Copyright(c)2018</p>
    23  * User: 段美林
    24  * Date: 2018/5/30 17:33
    25  * Description: 消息队列测试
    26  */
    27 @Controller
    28 @RequestMapping("/rabbitmq")
    29 public class RabbitmqController {
    30 
    31     private final Logger logger = LoggerFactory.getLogger(this.getClass());
    32 
    33     @Value("${mail.exchange.key}")
    34     private  String queue_key;
    35 
    36     @Value("${phone.topic.key.more}")
    37     private  String phone_key_more;
    38 
    39     @Value("${phone.topic.key}")
    40     private  String phone_key;
    41 
    42     @Autowired
    43     private MailProducerImpl mailProducer;
    44 
    45     @Autowired
    46     private PhoneProducerImpl phoneProducer;
    47 
    48     /**
    49      * @Description: 消息队列
    50      * @Author:
    51      * @CreateTime:
    52      */
    53     @ResponseBody
    54     @RequestMapping("/sendMailQueue")
    55     public SeckillResult<Long> testMailQueue() {
    56         SeckillResult<Long> result = null;
    57         Date now = new Date();
    58         try {
    59             Seckill seckill = new Seckill();
    60             seckill.setSeckillId(19339387);
    61             seckill.setName("duanml");
    62             mailProducer.sendDataToRabbitMQ(queue_key, seckill);
    63             result = new SeckillResult<Long>(true, now.getTime());
    64         } catch (Exception e) {
    65             logger.error(e.getMessage(), e);
    66         }
    67         return result;
    68     }
    69 
    70     /**
    71      * @Description: 消息队列
    72      * @Author:
    73      * @CreateTime:
    74      */
    75     @ResponseBody
    76     @RequestMapping("/sendPhoneQueue")
    77     public SeckillResult<Long> testPhoneQueue() {
    78         SeckillResult<Long> result = null;
    79         Date now = new Date();
    80         try {
    81             Seckill seckill = new Seckill();
    82 
    83             for (int i = 0; i < 10 ; i++) {
    84                 seckill.setSeckillId(1922339387+i);
    85                 seckill.setName("caijuan" + i);
    86                 phoneProducer.sendDataToRabbitMQ(phone_key, seckill);
    87             }
    88             seckill.setSeckillId(1240943092);
    89             seckill.setName("yinhaiyan");
    90             phoneProducer.sendDataToRabbitMQ(phone_key_more, seckill);
    91 
    92             result = new SeckillResult<Long>(true, now.getTime());
    93         } catch (Exception e) {
    94             logger.error(e.getMessage(), e);
    95         }
    96         return result;
    97     }
    98 
    99 }

    运行项目

    测试地址输入:测试Mail

    测试phone




  • 相关阅读:
    Vue生命周期(转)
    Gulp的简单使用
    webpack的简单使用
    面试----手写正则表达式
    面试----你可以手写一个promise吗
    baidu.com跳转www.baidu.com
    php 操作时间、日期类函数
    php操作文件类的函数
    sphinx搜索 笔记
    bash下输入命令的几个常用快捷键
  • 原文地址:https://www.cnblogs.com/yinfengjiujian/p/9117600.html
Copyright © 2011-2022 走看看