zoukankan      html  css  js  c++  java
  • MQ实战

    MQ是什么?

    MQ(消息队列)是一种跨进程的通信机制,用于上下游传递消息。

    MQ的优点

    异步处理,代码解藕。

    spring中集成MQ的实现

    1. xml配置

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jee="http://www.springframework.org/schema/jee"
           xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:util="http://www.springframework.org/schema/util"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
         http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
          http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
          http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
           http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
            http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd">
        <!-- windq配置start -->
        <!-- 生产者配置 -->
        <!-- JMS 连接工厂,必须配置destroy-method,会在停应用时,显式地销毁资源 -->
        <bean id="windqConnectionFactory" class="com.xxx.windq.jms.WindQConnectionFactory" destroy-method="destroy">
        </bean>
        <!-- 定义队列 -->
        <bean id="fundDetailRequestQueue" class="com.xxx.windq.jms.destination.WindQQueue">
            <!--请求资金明细的队列名称-->
            <constructor-arg value="FUND_DETAIL_REQUEST_TTMS"/>
        </bean>
        <!-- 缓存session连接工厂,只可用于jmsTemplate发送消息,不可用于MessageListenerContainer -->
        <bean id="cacheConnectionFactory" class="com.xxx.windq.spring.CachingConnectionFactory">
            <property name="targetConnectionFactory" ref="windqConnectionFactory"/>
            <!-- 缓存住的会话数,如果并发峰值超出此阈值仍然会新建会话,只是这些新建的会话在idle后会被关闭。此值应填写正常情况下的并发量 -->
            <property name="sessionCacheSize" value="20"/>
        </bean>
        <bean id="windqJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="cacheConnectionFactory"/>
        </bean>
        <!-- 用来发送消息的Service实例 -->
        <bean id="jmsFundDetailRequestSender" class="com.xxx.fms.remit.jms.JmsFundDetailRequestSender">
            <property name="jmsTemplate" ref="windqJmsTemplate"/>
            <!-- 此处关联定义的队列或主题 -->
            <property name="queueOrTopic" ref="fundDetailRequestQueue"/>
        </bean>
    
        <!-- 消费者配置 -->
        <!-- 用来接收消息的Listener实例 -->
        <bean id="jmsFundDetailListener" class="com.xxx.fms.remit.jms.JmsFundDetailListener"/>
        <bean id="listenerContainer" class="com.xxx.windq.spring.DefaultMessageListenerContainer">
            <!-- 使用WINDQ原生的连接工厂,不要使用cachingConnectionFactory,因为MLC自己内部有缓存机制 -->
            <property name="connectionFactory" ref="windqConnectionFactory"/>
            <!-- 填写上述定义中的实际要消费的队列(该队列由资金系统提供) -->
            <property name="destination" ref="myQueue"/>
            <!-- 业务处理类 -->
            <property name="messageListener" ref="jmsFundDetailListener"/>
            <!--单个JVM的并发consumer的数量:最小-最大。例如1-1,表示最小的和最大的并发消费者数量都是1 -->
            <property name="concurrency" value="1-1"/>
            <!-- 打开JMS会话事务(非JTA事务),session类型为transaction -->
            <property name="sessionTransacted" value="true"/>
        </bean>
        <!-- windq配置end -->
    </beans>

    2. 生产者JmsFundDetailRequestSender实现:

    @Component("jmsFundDetailRequestSender")
    class JmsFundDetailRequestSender {
    
        private static Logger LOGGER = LoggerFactory.getLogger(JmsFundDetailRequestSender.class);
    
        private Destination queueOrTopic;
    
        private JmsTemplate jmsTemplate;
    
        /**
         * 向指定队列发送消息
         * @param message
         */
        public void sendMessage(final Serializable message) {
            LOGGER.info("发送资金明细查询windq请求,sendMessage:{}", ToStringBuilder.reflectionToString(message));
            jmsTemplate.send(queueOrTopic, new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    ObjectMessage objectMessage = session.createObjectMessage();
                    // 如果需要设置以下任一属性头,就调用下clearProperties()方法,默认是不允许设置属性的,这个语句会打开属性变为可设置
                    objectMessage.clearProperties();
                    // 定位本条消息的业务字段,用于消息日志查询。例如如果填写订单号,那么通过订单号就能查询到这条消息。非必填字段
                    objectMessage.setStringProperty(MessageHeader.WINDQ_MSG_ABSTRACT_HEADER, message.toString());
                    // 填写消息体
                    objectMessage.setObject(message);
                    return objectMessage;
                }
            });
        }
    
        public Destination getQueueOrTopic() {
            return queueOrTopic;
        }
    
        public void setQueueOrTopic(Destination queueOrTopic) {
            this.queueOrTopic = queueOrTopic;
        }
    
        public JmsTemplate getJmsTemplate() {
            return jmsTemplate;
        }
    
        public void setJmsTemplate(JmsTemplate jmsTemplate) {
            this.jmsTemplate = jmsTemplate;
        }
    }

    注:代码中用到了匿名内部类,有关匿名内部类的解释,可以查看匿名内部类详解

    3. 消费者JmsFundDetailListener实现:

    @Component("jmsFundDetailListener")
    public class JmsFundDetailListener implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
            if (message != null) {
                // 业务处理代码
            }
        }
    }
    
  • 相关阅读:
    Samba 4.0 RC3 发布
    SymmetricDS 3.1.7 发布,数据同步和复制
    Express.js 3.0 发布,Node.js 的高性能封装
    GIFLIB 5.0.1 发布,C语言的GIF处理库
    jQuery UI 1.9.1 发布
    SVN Access Manager 0.5.5.14 发布 SVN 管理工具
    DynamicReports 3.0.3 发布 Java 报表工具
    HttpComponents HttpClient 4.2.2 GA 发布
    AppCan 2.0 正式发布,推移动应用云服务
    Ruby 2.0 的新功能已经冻结
  • 原文地址:https://www.cnblogs.com/atai/p/10649662.html
Copyright © 2011-2022 走看看