zoukankan      html  css  js  c++  java
  • RabbitMQ整合spring

      1 <?xml version="1.0" encoding="UTF-8"?>
      2 <beans xmlns="http://www.springframework.org/schema/beans"
      3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
      4     xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:jdbc="http://www.springframework.org/schema/jdbc"
      5     xmlns:jee="http://www.springframework.org/schema/jee" xmlns:aop="http://www.springframework.org/schema/aop"
      6     xmlns:tx="http://www.springframework.org/schema/tx" xmlns:task="http://www.springframework.org/schema/task"
      7     xmlns:rabbit="http://www.springframework.org/schema/rabbit"
      8     xsi:schemaLocation="http://www.springframework.org/schema/beans
      9      http://www.springframework.org/schema/beans/spring-beans.xsd
     10       http://www.springframework.org/schema/context
     11        http://www.springframework.org/schema/context/spring-context-4.3.xsd
     12         http://www.springframework.org/schema/mvc
     13          http://www.springframework.org/schema/mvc/spring-mvc.xsd
     14             http://www.springframework.org/schema/tx
     15             http://www.springframework.org/schema/tx/spring-tx.xsd
     16             http://www.springframework.org/schema/aop
     17             http://www.springframework.org/schema/aop/spring-aop.xsd
     18             http://www.springframework.org/schema/task
     19         http://www.springframework.org/schema/task/spring-task.xsd
     20             http://www.springframework.org/schema/rabbit
     21             http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd">
     22 
     23     <description>rabbitmq 连接服务配置</description>
     24     <!-- 不适用【发布确认】连接配置 -->
     25     <rabbit:connection-factory id="rabbitConnectionFactory"
     26         host="172.18.112.102" username="woms" password="woms" port="5672"
     27         virtual-host="lingyi" channel-cache-size="25" cache-mode="CHANNEL" publisher-confirms="true" publisher-returns="true" connection-timeout="200"/>
     28 
     29 
     30 
     31  <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
     32         <property name="backOffPolicy">
     33             <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
     34                 <property name="initialInterval" value="200" />
     35                 <property name="maxInterval" value="30000" />
     36             </bean>
     37         </property>
     38         <property name="retryPolicy">
     39             <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
     40                 <property name="maxAttempts" value="5"/>
     41             </bean>
     42         </property>
     43     </bean>
     44 
     45 
     46 
     47     <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 如果使用多exchange必须配置declared-by="connectAdmin" -->
     48     <rabbit:admin id="connectAdmin" connection-factory="connectionFactory"/>
     49 
     50     <rabbit:template id="ampqTemplate" connection-factory="connectionFactory"
     51         exchange="test-mq-exchange" return-callback="sendReturnCallback"
     52         message-converter="jsonMessageConverter" routing-key="test_queue_key"
     53         mandatory="true" confirm-callback="confirmCallback" retry-template="retryTemplate"/>
     54 
     55 
     56     <bean id="confirmCallback" class="ly.net.rabbitmq.MsgSendConfirmCallBack" />
     57     <bean id="sendReturnCallback" class="ly.net.rabbitmq.MsgSendReturnCallback" />
     58     <!-- 消息对象json转换类 -->
     59     <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
     60 
     61 
     62 
     63 
     64 
     65 
     66 
     67 
     68 
     69 
     70 
     71 
     72 
     73 
     74     <!-- queue配置 -->
     75     <!-- durable:是否持久化 -->
     76     <!-- exclusive: 仅创建者可以使用的私有队列,断开后自动删除 -->
     77     <!-- auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 -->
     78     <rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false" declared-by="rabbitAdmin" />
     79 
     80 
     81 
     82 
     83     <!-- exchange配置 -->
     84     <!-- rabbit:direct-exchange:定义exchange模式为direct,意思就是消息与一个特定的路由键完全匹配,才会转发。 -->
     85     <!-- rabbit:binding:设置消息queue匹配的key -->
     86     <rabbit:direct-exchange name="test-mq-exchange"
     87         durable="true" auto-delete="false" id="test-mq-exchange" declared-by="rabbitAdmin">
     88         <rabbit:bindings>
     89             <rabbit:binding queue="test_queue_key" key="test_queue_key" />
     90         </rabbit:bindings>
     91     </rabbit:direct-exchange>
     92 
     93     <!-- <rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false"> -->
     94     <!-- <rabbit:bindings> -->
     95     <!-- 设置消息Queue匹配的pattern (direct模式为key) -->
     96     <!-- <rabbit:binding queue="test_queue" pattern="${mq.queue}_patt"/> -->
     97     <!-- </rabbit:bindings> -->
     98     <!-- </rabbit:topic-exchange> -->
     99 
    100 
    101     <bean id="mqConsumer" class="ly.net.rabbitmq.MQConsumer" />
    102     <bean id="mqConsumer1" class="ly.net.rabbitmq.MQConsumerManual" />
    103 
    104     <!-- listener配置 消费者 自动确认 -->
    105     <!-- queues:监听的队列,多个的话用逗号(,)分隔 ref:监听器 -->
    106     <rabbit:listener-container
    107         connection-factory="connectionFactory" acknowledge="auto"
    108         message-converter="jsonMessageConverter">
    109         <rabbit:listener queues="test_queue_key" ref="mqConsumer" />
    110     </rabbit:listener-container>
    111     <!-- 消费者 手动确认 -->
    112     <rabbit:listener-container
    113         connection-factory="connectionFactory" acknowledge="manual">
    114         <rabbit:listener queues="test_queue_key" ref="mqConsumer1" />
    115     </rabbit:listener-container>
    116 
    117 
    118 
    119 
    120 
    121 
    122 
    123 </beans>
     1 package ly.net.rabbitmq;
     2 
     3 
     4 import org.springframework.amqp.rabbit.core.RabbitTemplate;
     5 import org.springframework.amqp.rabbit.support.CorrelationData;
     6 public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
     7 
     8     @Override
     9     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    10         // TODO Auto-generated method stub
    11         if (ack) {  
    12             System.out.println("消息确认成功");  
    13         } else {  
    14             //处理丢失的消息  
    15             System.out.println("消息确认失败,"+cause);  
    16         } 
    17     }  
    18     
    19 } 
    package ly.net.rabbitmq;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
    import org.springframework.beans.factory.annotation.Autowired;
    
    public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback{
        @Autowired
        private RabbitTemplate errorTemplate;
        
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            String msgJson  = new String(message.getBody());  
            System.out.println("Returned Message:"+msgJson); 
            
            //重新发布
    //        RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(errorTemplate,"errorExchange", "errorRoutingKey");
    //        Throwable cause = new Exception(new Exception("route_fail_and_republish"));
    //        recoverer.recover(message,cause);
    //        System.out.println("Returned Message:"+replyText);
    //        
        }
    
    }
     1 package ly.net.rabbitmq;
     2 
     3 import org.springframework.amqp.core.Message;
     4 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
     5 
     6 import com.rabbitmq.client.Channel;
     7 
     8 public class MQConsumerManual implements ChannelAwareMessageListener {
     9 
    10     @Override
    11     public void onMessage(Message message, Channel channel) throws Exception {
    12         // TODO Auto-generated method stub
    13         //手动确认
    14         channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    15     }
    16 
    17 }
    @Service
    public class MQProducerImpl implements MQProducer {
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        private final static Logger LOGGER = Logger.getLogger(MQProducerImpl.class);
       /*
        * convertAndSend:将Java对象转换为消息发送到匹配Key的交换机中Exchange,由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
        * 原文:Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
        */
        @Override
        public void sendDataToQueue(String queueKey, Object object) {
            try {
                amqpTemplate.convertAndSend(object);
                
            } catch (Exception e) {
                LOGGER.error(e);
            }
    
        }
    }
    public interface MQProducer {
        /**
         * 发送消息到指定队列
         * @param queueKey
         * @param object
         */
        public void sendDataToQueue(String queueKey, Object object);
    }
  • 相关阅读:
    Spring Boot中整合Sharding-JDBC读写分离示例
    Spring Boot Admin2.X监控的服务context-path问题
    Spring Boot中整合Sharding-JDBC单库分表示例
    Spring Cloud Gateway 之 AddRequestHeader GatewayFilter Factory
    程序猿:论学习方式的重要性
    如何使用mybatis《三》
    如何使用mybatis《二》
    如何使用mybatis《一》
    mybatis中自建的类型别名
    mybatis中#和$符号的区别
  • 原文地址:https://www.cnblogs.com/woms/p/7040902.html
Copyright © 2011-2022 走看看