zoukankan      html  css  js  c++  java
  • RabbitMQ学习总结(7)——Spring整合RabbitMQ实例

    1.RabbitMQ简介

    RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。 
    官网:http://www.rabbitmq.com/

    2.Spring集成RabbitMQ

    2.1 maven配置

    //pom.xml
    <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>3.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.4.5.RELEASE</version>
        </dependency>
    

    2.2 rabbmitmq配置文件

    //rabbitmq-config.properties
    mq.host=127.0.0.1
    mq.username=test
    mq.password=123456
    mq.port=5672
    mq.vhost=testmq
    

    2.3 Spring配置

    //application-mq.xml
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
        xsi:schemaLocation="http://www.springframework.org/schema/beans 
        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
        http://www.springframework.org/schema/rabbit
        http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >
    
        <description>rabbitmq 连接服务配置</description>
        <!-- 连接配置 -->
        <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}"  virtual-host="${mq.vhost}"/>
        <rabbit:admin connection-factory="connectionFactory"/>
    
        <!-- spring template声明-->
        <rabbit:template exchange="amqpExchange" id="amqpTemplate"  connection-factory="connectionFactory"  message-converter="jsonMessageConverter" />
    
        <!-- 消息对象json转换类 -->
        <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />  
    </beans>
    

    3. 在Spring中使用RabbitMQ

    3.1 申明一个消息队列Queue

    //application-mq.xml
    <rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false" />
    

    说明: 

    durable:是否持久化

    exclusive: 仅创建者可以使用的私有队列,断开后自动删除

    auto_delete: 当所有消费客户端连接断开后,是否自动删除队列

    3.2 交换机定义

    //application-mq.xml
    <rabbit:direct-exchange name="test-mq-exchange" durable="true" auto-delete="false" id="test-mq-exchange">
        <rabbit:bindings>
            <rabbit:binding queue="test_queue_key" key="test_queue_key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
    

    说明: 

    rabbit:direct-exchange:定义exchange模式为direct,意思就是消息与一个特定的路由键完全匹配,才会转发。 

    rabbit:binding:设置消息queue匹配的key

    3.3 发送消息Producer

    //MQProducer.java
    public interface MQProducer {
        /**
         * 发送消息到指定队列
         * @param queueKey
         * @param object
         */
        public void sendDataToQueue(String queueKey, Object object);
    }
    


    @Service
    public class MQProducerImpl implements MQProducer {
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        private final static Logger LOGGER = Logger.getLogger(MQProducerImpl.class);
        /* (non-Javadoc)
         * @see com.stnts.tita.rm.api.mq.MQProducer#sendDataToQueue(java.lang.String, java.lang.Object)
         */
        @Override
        public void sendDataToQueue(String queueKey, Object object) {
            try {
                amqpTemplate.convertAndSend(queueKey, object);
            } catch (Exception e) {
                LOGGER.error(e);
            }
    
        }
    }
    

    说明: 

    convertAndSend:将Java对象转换为消息发送到匹配Key的交换机中Exchange,由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。原文:Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.

    3.4 异步接收消息Consumer

    定义监听器

    //QueueListenter.java
    @Component
    public class QueueListenter implements MessageListener {
    
        @Override
        public void onMessage(Message msg) {
            try{
                System.out.print(msg.toString());
            }catch(Exception e){
                e.printStackTrace();
            }
        }
    
    }
    

    监听配置

    //application-mq.xml
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
        <rabbit:listener queues="test_queue" ref="queueListenter"/>
    </rabbit:listener-container>
    

    说明: 

    queues:监听的队列,多个的话用逗号(,)分隔 

    ref:监听器

    3.5 JUnit测试

    //TestQueue.java
    @RunWith(value = SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = {
        "classpath:/ApplicationContext/ApplicationContext-mq.xml"})
    
    public class TestQueue{
        @Autowired
        MQProducer mqProducer;
    
        final String queue_key = "test_queue";
    
        @Test
        public void send(){
            Map<String,Object> msg = new HashMap()<>;
            msg.put("data","hello,rabbmitmq!");
            mqProducer.sendDataToQueue(query_key,msg);
        }
    }
    

    运行测试程序,Run with JUnit,会发送一条消息到test_queue,监听器监听到消息后,打印出消息。

    至此,已经完成了Spring和RabbmitMQ集成,配置,和使用。

  • 相关阅读:
    68
    56
    Django manager 命令笔记
    Django 执行 manage 命令方式
    Django 连接 Mysql (8.0.16) 失败
    Python django 安装 mysqlclient 失败
    H.264 SODB RBSP EBSP的区别
    FFmpeg—— Bitstream Filters 作用
    MySQL 远程连接问题 (Windows Server)
    MySQL 笔记
  • 原文地址:https://www.cnblogs.com/zhanghaiyang/p/7213519.html
Copyright © 2011-2022 走看看