zoukankan      html  css  js  c++  java
  • MQ实战

    MQ是什么?

    MQ(消息队列)是一种跨进程的通信机制,用于上下游传递消息。

    MQ的优点

    异步处理,代码解藕。

    spring中集成MQ的实现

    1. xml配置

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jee="http://www.springframework.org/schema/jee"
           xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:util="http://www.springframework.org/schema/util"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
         http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
          http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
          http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
           http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
            http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd">
        <!-- windq配置start -->
        <!-- 生产者配置 -->
        <!-- JMS 连接工厂,必须配置destroy-method,会在停应用时,显式地销毁资源 -->
        <bean id="windqConnectionFactory" class="com.xxx.windq.jms.WindQConnectionFactory" destroy-method="destroy">
        </bean>
        <!-- 定义队列 -->
        <bean id="fundDetailRequestQueue" class="com.xxx.windq.jms.destination.WindQQueue">
            <!--请求资金明细的队列名称-->
            <constructor-arg value="FUND_DETAIL_REQUEST_TTMS"/>
        </bean>
        <!-- 缓存session连接工厂,只可用于jmsTemplate发送消息,不可用于MessageListenerContainer -->
        <bean id="cacheConnectionFactory" class="com.xxx.windq.spring.CachingConnectionFactory">
            <property name="targetConnectionFactory" ref="windqConnectionFactory"/>
            <!-- 缓存住的会话数,如果并发峰值超出此阈值仍然会新建会话,只是这些新建的会话在idle后会被关闭。此值应填写正常情况下的并发量 -->
            <property name="sessionCacheSize" value="20"/>
        </bean>
        <bean id="windqJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="cacheConnectionFactory"/>
        </bean>
        <!-- 用来发送消息的Service实例 -->
        <bean id="jmsFundDetailRequestSender" class="com.xxx.fms.remit.jms.JmsFundDetailRequestSender">
            <property name="jmsTemplate" ref="windqJmsTemplate"/>
            <!-- 此处关联定义的队列或主题 -->
            <property name="queueOrTopic" ref="fundDetailRequestQueue"/>
        </bean>
    
        <!-- 消费者配置 -->
        <!-- 用来接收消息的Listener实例 -->
        <bean id="jmsFundDetailListener" class="com.xxx.fms.remit.jms.JmsFundDetailListener"/>
        <bean id="listenerContainer" class="com.xxx.windq.spring.DefaultMessageListenerContainer">
            <!-- 使用WINDQ原生的连接工厂,不要使用cachingConnectionFactory,因为MLC自己内部有缓存机制 -->
            <property name="connectionFactory" ref="windqConnectionFactory"/>
            <!-- 填写上述定义中的实际要消费的队列(该队列由资金系统提供) -->
            <property name="destination" ref="myQueue"/>
            <!-- 业务处理类 -->
            <property name="messageListener" ref="jmsFundDetailListener"/>
            <!--单个JVM的并发consumer的数量:最小-最大。例如1-1,表示最小的和最大的并发消费者数量都是1 -->
            <property name="concurrency" value="1-1"/>
            <!-- 打开JMS会话事务(非JTA事务),session类型为transaction -->
            <property name="sessionTransacted" value="true"/>
        </bean>
        <!-- windq配置end -->
    </beans>

    2. 生产者JmsFundDetailRequestSender实现:

    @Component("jmsFundDetailRequestSender")
    class JmsFundDetailRequestSender {
    
        private static Logger LOGGER = LoggerFactory.getLogger(JmsFundDetailRequestSender.class);
    
        private Destination queueOrTopic;
    
        private JmsTemplate jmsTemplate;
    
        /**
         * 向指定队列发送消息
         * @param message
         */
        public void sendMessage(final Serializable message) {
            LOGGER.info("发送资金明细查询windq请求,sendMessage:{}", ToStringBuilder.reflectionToString(message));
            jmsTemplate.send(queueOrTopic, new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    ObjectMessage objectMessage = session.createObjectMessage();
                    // 如果需要设置以下任一属性头,就调用下clearProperties()方法,默认是不允许设置属性的,这个语句会打开属性变为可设置
                    objectMessage.clearProperties();
                    // 定位本条消息的业务字段,用于消息日志查询。例如如果填写订单号,那么通过订单号就能查询到这条消息。非必填字段
                    objectMessage.setStringProperty(MessageHeader.WINDQ_MSG_ABSTRACT_HEADER, message.toString());
                    // 填写消息体
                    objectMessage.setObject(message);
                    return objectMessage;
                }
            });
        }
    
        public Destination getQueueOrTopic() {
            return queueOrTopic;
        }
    
        public void setQueueOrTopic(Destination queueOrTopic) {
            this.queueOrTopic = queueOrTopic;
        }
    
        public JmsTemplate getJmsTemplate() {
            return jmsTemplate;
        }
    
        public void setJmsTemplate(JmsTemplate jmsTemplate) {
            this.jmsTemplate = jmsTemplate;
        }
    }

    注:代码中用到了匿名内部类,有关匿名内部类的解释,可以查看匿名内部类详解

    3. 消费者JmsFundDetailListener实现:

    @Component("jmsFundDetailListener")
    public class JmsFundDetailListener implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
            if (message != null) {
                // 业务处理代码
            }
        }
    }
    
  • 相关阅读:
    ExtJs之表单(form)
    tf.where
    kuiper流式计算完整实例演示
    centos下搭建kuiper以及kuiper-manager
    Centos搭建EMQX和EMQ-Dashboard(踩坑精华版)
    代码生成器
    [MIT新技术大会]Jeff Bezos把EC2、S3和土耳其机器人描述为亚马逊“11年来的大规模万维网计算”方面的结晶,强调把后台基础设施作为服务
    《商业周刊》封面文章《谷歌和云的智慧》,讲到谷歌的新战略是“把惊人的计算能力放到众人手里”
    C# 连接 Sqlserver2005 Analysis Service的总结
    POJ_1064 二分搜索
  • 原文地址:https://www.cnblogs.com/atai/p/10649662.html
Copyright © 2011-2022 走看看