zoukankan      html  css  js  c++  java
  • spring整合消息队列rabbitmq

    spring大家太熟,就不多说了

    rabbitmq一个amqp的队列服务实现,具体介绍请参考本文http://lynnkong.iteye.com/blog/1699684

    本文侧重介绍如何将rabbitmq整合到项目中

    ps:本文只是简单一个整合介绍,属于抛砖引玉,具体实现还需大家深入研究哈..

    1.首先是生产者配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="
                http://www.springframework.org/schema/beans
                    http://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/context
                    http://www.springframework.org/schema/context/spring-context.xsd
                http://www.springframework.org/schema/rabbit
                    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
     
         
       <!-- 连接服务配置  -->
       <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest"
            password="guest" port="5672"  />
             
       <rabbit:admin connection-factory="connectionFactory"/>
        
       <!-- queue 队列声明-->
       <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
        
        
       <!-- exchange queue binging key 绑定 -->
        <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange<span></span>">
            <rabbit:bindings>
                <rabbit:binding queue="queue_one" key="queue_one_key"/>
            </rabbit:bindings>
        </rabbit:direct-exchange>
         
        <-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
        <bean id="jsonMessageConverter"  class="mq.convert.FastJsonMessageConverter"></bean>
         
        <-- spring template声明-->
        <rabbit:template exchange="my-mq-exchange" id="amqpTemplate"  connection-factory="connectionFactory"  message-converter="jsonMessageConverter"/>
    </beans>

    2.fastjson messageconver插件实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.support.converter.AbstractMessageConverter;
    import org.springframework.amqp.support.converter.MessageConversionException;
     
    import fe.json.FastJson;
     
    public class FastJsonMessageConverter  extends AbstractMessageConverter {
        private static Log log = LogFactory.getLog(FastJsonMessageConverter.class);
     
        public static final String DEFAULT_CHARSET = "UTF-8";
     
        private volatile String defaultCharset = DEFAULT_CHARSET;
         
        public FastJsonMessageConverter() {
            super();
            //init();
        }
         
        public void setDefaultCharset(String defaultCharset) {
            this.defaultCharset = (defaultCharset != null) ? defaultCharset
                    : DEFAULT_CHARSET;
        }
         
        public Object fromMessage(Message message)
                throws MessageConversionException {
            return null;
        }
         
        public <T> T fromMessage(Message message,T t) {
            String json = "";
            try {
                json = new String(message.getBody(),defaultCharset);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            return (T) FastJson.fromJson(json, t.getClass());
        }  
         
     
        protected Message createMessage(Object objectToConvert,
                MessageProperties messageProperties)
                throws MessageConversionException {
            byte[] bytes = null;
            try {
                String jsonString = FastJson.toJson(objectToConvert);
                bytes = jsonString.getBytes(this.defaultCharset);
            } catch (UnsupportedEncodingException e) {
                throw new MessageConversionException(
                        "Failed to convert Message content", e);
            }
            messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
            messageProperties.setContentEncoding(this.defaultCharset);
            if (bytes != null) {
                messageProperties.setContentLength(bytes.length);
            }
            return new Message(bytes, messageProperties);
     
        }
    }

    3.生产者端调用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    import java.util.List;
     
    import org.springframework.amqp.core.AmqpTemplate;
     
     
    public class MyMqGatway {
         
        @Autowired
        private AmqpTemplate amqpTemplate;
         
        public void sendDataToCrQueue(Object obj) {
            amqpTemplate.convertAndSend("queue_one_key", obj);
        }  
    }

    4.消费者端配置(与生产者端大同小异)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="
                http://www.springframework.org/schema/beans
                    http://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/context
                    http://www.springframework.org/schema/context/spring-context.xsd
                http://www.springframework.org/schema/rabbit
                    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
     
         
       <!-- 连接服务配置  -->
       <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest"
            password="guest" port="5672"  />
             
       <rabbit:admin connection-factory="connectionFactory"/>
        
       <!-- queue 队列声明-->
       <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
        
        
       <!-- exchange queue binging key 绑定 -->
        <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
            <rabbit:bindings>
                <rabbit:binding queue="queue_one" key="queue_one_key"/>
            </rabbit:bindings>
        </rabbit:direct-exchange>
     
         
          
        <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
        <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor">
            <rabbit:listener queues="queue_one" ref="queueOneLitener"/>
        </rabbit:listener-container>
    </beans>

    5.消费者端调用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
     
    public class QueueOneLitener implements  MessageListener{
        @Override
        public void onMessage(Message message) {
            System.out.println(" data :" + message.getBody());
        }
    }

    6.由于消费端当队列有数据到达时,对应监听的对象就会被通知到,无法做到批量获取,批量入库,因此可以在消费端缓存一个临时队列,将mq取出来的数据存入本地队列,后台线程定时批量处理即可

  • 相关阅读:
    MySql创建库 Challenge
    未能启用约束。一行或多行中包含违反非空、唯一或外键约束的值的解决办法.
    小总结:用反射机制创建的分配数据分配器
    工厂模式的反思
    单机安装“完整”SharePoint 2010
    作业调度框架 Quartz.NET 2.0 StepByStep(2)
    UI线程同步
    每日见闻(一)
    作业调度框架 Quartz.NET 2.0 StepByStep
    基础算法(ACwing)
  • 原文地址:https://www.cnblogs.com/fyx158497308/p/4703860.html
Copyright © 2011-2022 走看看