zoukankan      html  css  js  c++  java
  • 消息队列RabbitMQ与Spring集成

    1.RabbitMQ简介

    RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。
    官网:http://www.rabbitmq.com/

    2.maven配置

            <!--rabbit -->
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.5.1</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>1.4.6.RELEASE</version>
            </dependency>

    3.配置文件

    rabbitmq.properties

    mq.host=172.17.22.187
    mq.username=remote_user
    mq.password=123456

    4.Spring配置

    <?xml version="1.0" encoding="UTF-8"?>  
    <beans xmlns="http://www.springframework.org/schema/beans"  
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
        xmlns:rabbit="http://www.springframework.org/schema/rabbit"  
        xsi:schemaLocation="  
            http://www.springframework.org/schema/beans   
            http://www.springframework.org/schema/beans/spring-beans-3.1.xsd  
            http://www.springframework.org/schema/context   
            http://www.springframework.org/schema/context/spring-context-3.1.xsd  
            http://www.springframework.org/schema/rabbit   
            http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd">  
    
    <context:property-placeholder location="classpath:rabbitmq.properties" />
    
        <!--配置connection-factory,指定连接rabbit server参数-->
        <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" /> 
    
        <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成-->
        <rabbit:admin connection-factory="connectionFactory"/>
    
        <!--定义queue-->
        <rabbit:queue id="com.mj.test" name="com.mj.test" durable="true" auto-delete="false" exclusive="false"/>
    
        <!-- 定义direct exchange,绑定com.mj.test queue -->
        <rabbit:direct-exchange name="myChange" durable="true" auto-delete="false">
            <rabbit:bindings>
                <rabbit:binding queue="com.mj.test" key="hello"></rabbit:binding>
            </rabbit:bindings>
        </rabbit:direct-exchange>
        
        <!--定义rabbit template用于数据的接收和发送-->
        <rabbit:template id="myAmqpTemplate" connection-factory="connectionFactory" exchange="myChange" />
        
        <!-- 接受 -->
         <bean id="messageReceiver" class="com.ucs.mq.QueueListenter"></bean>
        <rabbit:listener-container connection-factory="connectionFactory">
            <rabbit:listener queues="com.mj.test" ref="messageReceiver"/>
        </rabbit:listener-container>  
        
    </beans>

    5.发送消息Producer

    package com.ucs.mq;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import net.sf.json.JSONSerializer;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    import org.springframework.test.context.transaction.TransactionConfiguration;
    import org.springframework.transaction.annotation.Transactional;
    
    import com.ucs.base.BaseTest;
    
    @Transactional
    @RunWith(SpringJUnit4ClassRunner.class) 
    @TransactionConfiguration(transactionManager="transactionManager",defaultRollback=false)
    @ContextConfiguration(locations={"classpath:application-mq.xml"})
    public class TestMQ {
        
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        
        @Test
        public void send() throws Exception{
             
            List<String> submobileList=new ArrayList<String>();        
            submobileList.add("1");
            submobileList.add("2");
            submobileList.add("3");
            Map<String, Object> bodyMap = new HashMap<String, Object>();
            bodyMap.put("batchNo", "递四方速递");    
            bodyMap.put("item", submobileList);           
            String jsonStr=JSONSerializer.toJSON(bodyMap).toString();
            amqpTemplate.convertAndSend("hello", jsonStr);   
        }
    }

    6.异步接收消息Consumer

    package com.ucs.mq;
    
    import net.sf.json.JSONArray;
    import net.sf.json.JSONObject;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    
    
    public class QueueListenter implements MessageListener {
    
    
        @Override
        public void onMessage(Message msg) {
            try{      
                System.out.println(new String(msg.getBody(),"UTF-8"));
                String str=new String(msg.getBody(),"UTF-8");
                JSONObject json=JSONObject.fromObject(str);
    
                System.out.println(json.get("batchNo").toString());
                JSONArray jSONArray=JSONArray.fromObject(json.get("item"));
                System.out.println(jSONArray.toString());
            }catch(Exception e){
                e.printStackTrace();
            }
        }   
    }

    7.运行

    {"batchNo":"递四方速递","item":["1","2","3"]}
    递四方速递
    ["1","2","3"]
  • 相关阅读:
    记录ViewPager配合Fragment使用中遇到的一个问题
    StringBuffer类的构造方法
    认识StringBuffer类
    Java中增强for循环的用法
    xml解析案例
    XML的序列化(Serializer)
    文件权限之(介绍,更改,扩展)
    保存数据到sdcard中去
    反编译
    后端——框架——容器框架——spring_core——格式化器
  • 原文地址:https://www.cnblogs.com/crazylqy/p/6567598.html
Copyright © 2011-2022 走看看