zoukankan      html  css  js  c++  java
  • spring整合消息队列rabbitmq

    ps:本文只是简单一个整合介绍,属于抛砖引玉,具体实现还需大家深入研究哈..


     

    1.首先是生产者配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="
                http://www.springframework.org/schema/beans
                    http://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/context
                    http://www.springframework.org/schema/context/spring-context.xsd
                http://www.springframework.org/schema/rabbit
                    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
     
         
       <!-- 连接服务配置  -->
       <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest"
            password="guest" port="5672"  />
             
       <rabbit:admin connection-factory="connectionFactory"/>
        
       <!-- queue 队列声明-->
       <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
        
        
       <!-- exchange queue binging key 绑定 -->
        <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
            <rabbit:bindings>
                <rabbit:binding queue="queue_one" key="queue_one_key"/>
            </rabbit:bindings>
        </rabbit:direct-exchange>
         
        <-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
        <bean id="jsonMessageConverter"  class="mq.convert.FastJsonMessageConverter"></bean>
         
        <-- spring template声明-->
        <rabbit:template exchange="my-mq-exchange" id="amqpTemplate"  connection-factory="connectionFactory"  message-converter="jsonMessageConverter"/>
    </beans>

    2.fastjson messageconver插件实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.support.converter.AbstractMessageConverter;
    import org.springframework.amqp.support.converter.MessageConversionException;
     
    import fe.json.FastJson;
     
    public class FastJsonMessageConverter  extends AbstractMessageConverter {
        private static Log log = LogFactory.getLog(FastJsonMessageConverter.class);
     
        public static final String DEFAULT_CHARSET = "UTF-8";
     
        private volatile String defaultCharset = DEFAULT_CHARSET;
         
        public FastJsonMessageConverter() {
            super();
            //init();
        }
         
        public void setDefaultCharset(String defaultCharset) {
            this.defaultCharset = (defaultCharset != null) ? defaultCharset
                    : DEFAULT_CHARSET;
        }
         
        public Object fromMessage(Message message)
                throws MessageConversionException {
            return null;
        }
         
        public <T> T fromMessage(Message message,T t) {
            String json = "";
            try {
                json = new String(message.getBody(),defaultCharset);
            catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            return (T) FastJson.fromJson(json, t.getClass());
        }  
         
     
        protected Message createMessage(Object objectToConvert,
                MessageProperties messageProperties)
                throws MessageConversionException {
            byte[] bytes = null;
            try {
                String jsonString = FastJson.toJson(objectToConvert);
                bytes = jsonString.getBytes(this.defaultCharset);
            catch (UnsupportedEncodingException e) {
                throw new MessageConversionException(
                        "Failed to convert Message content", e);
            
            messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
            messageProperties.setContentEncoding(this.defaultCharset);
            if (bytes != null) {
                messageProperties.setContentLength(bytes.length);
            }
            return new Message(bytes, messageProperties);
     
        }
    }

    3.生产者端调用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    import java.util.List;
     
    import org.springframework.amqp.core.AmqpTemplate;
     
     
    public class MyMqGatway {
         
        @Autowired
        private AmqpTemplate amqpTemplate;
         
        public void sendDataToCrQueue(Object obj) {
            amqpTemplate.convertAndSend("queue_one_key", obj);
        }  
    }

    4.消费者端配置(与生产者端大同小异)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="
                http://www.springframework.org/schema/beans
                    http://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/context
                    http://www.springframework.org/schema/context/spring-context.xsd
                http://www.springframework.org/schema/rabbit
                    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
     
         
       <!-- 连接服务配置  -->
       <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest"
            password="guest" port="5672"  />
             
       <rabbit:admin connection-factory="connectionFactory"/>
        
       <!-- queue 队列声明-->
       <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
        
        
       <!-- exchange queue binging key 绑定 -->
        <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
            <rabbit:bindings>
                <rabbit:binding queue="queue_one" key="queue_one_key"/>
            </rabbit:bindings>
        </rabbit:direct-exchange>
     
         
          
        <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
        <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor">
            <rabbit:listener queues="queue_one" ref="queueOneLitener"/>
        </rabbit:listener-container>
    </beans>

    5.消费者端调用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
     
    public class QueueOneLitener implements  MessageListener{
        @Override
        public void onMessage(Message message) {
            System.out.println(" data :" + message.getBody());
        }
    }

    6.由于消费端当队列有数据到达时,对应监听的对象就会被通知到,无法做到批量获取,批量入库,因此可以在消费端缓存一个临时队列,将mq取出来的数据存入本地队列,后台线程定时批量处理即可

    转自:http://blog.csdn.net/l192168134/article/details/51210188

  • 相关阅读:
    数据倾斜原理及解决方案
    删除emp_no重复的记录,只保留最小的id对应的记录
    理解HBase面向列存储
    给数据库用户授权(对象多为系统表,如dba可以查看的表)
    SpringBoot里的一些注解
    01背包
    【转】简说GNU, GCC and MinGW (Lu Hongling)
    费马小定理
    欧拉定理
    【转】C中的静态存储区和动态存储区
  • 原文地址:https://www.cnblogs.com/coderdxj/p/7003525.html
Copyright © 2011-2022 走看看