zoukankan      html  css  js  c++  java
  • ActiveMQ (三):项目实践

    1. 简单项目demo 

        Com.hoo.mq路径下(除了com.hoo.mq.spring)是普通java中使用activeMQ.   

        Com.hoo.mq.spring路径下是非web环境spring集成activeMQ

          Com.tgb.SpringActivemq路径下是web环境下spring mvc集成activeMQ. 

      

        目录结构如下:

        

      

        github下载地址:https://github.com/Monkey-mi/TestActiveMQ.git

    2. web项目

      2.1 生产者

        spring-activemq.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
        xmlns:jms="http://www.springframework.org/schema/jms"
        xsi:schemaLocation="http://www.springframework.org/schema/beans   
            http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
            http://www.springframework.org/schema/context   
            http://www.springframework.org/schema/context/spring-context-4.0.xsd
            http://www.springframework.org/schema/jms
            http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
            http://activemq.apache.org/schema/core
            http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
        
        <!-- 加载activemq的属性配置文件 -->
        <context:property-placeholder location="classpath:config/activemq.properties" />
    
        <!-- ActiveMQ 连接工厂 -->
         <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
        <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
        <amq:connectionFactory id="amqConnectionFactory"
            brokerURL="tcp://localhost:61618" userName="admin" password="topsun"  />
    
        <!-- Spring Caching连接工厂 -->
         <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory 
         org.springframework.jms.connection.CachingConnectionFactory
         org.apache.activemq.pool.PooledConnectionFactory
         -->  
        <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
            <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
              <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
              <!-- 同上,同理 -->
            <!-- <constructor-arg ref="amqConnectionFactory" /> -->
            <!-- Session缓存数量 -->
            <property name="sessionCacheSize" value="100" />
            <!-- 接收者ID,用于Topic订阅者的永久订阅 clientId 作为客户端的标识,连接同一服务的客户端不能拥有相同的 -->
            <property name="clientId" value="client-B" /> 
        </bean>
        
        <!-- Spring JmsTemplate 的消息生产者 start-->
        
        <!-- 定义JmsTemplate的Queue类型 -->
        <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
            <constructor-arg ref="connectionFactory" />
            <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
            <property name="pubSubDomain" value="false" />
            <!-- 订阅消息持久化 -->
            <property name="deliveryPersistent" value="true" />
            <!-- 配置持久化,同上 deliveryPersistent
            <property name="deliveryMode" value="2" />
            -->
        </bean>
        
        <!-- 定义JmsTemplate的Topic类型 -->
        <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
             <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
            <constructor-arg ref="connectionFactory" />
            <!-- pub/sub模型(发布/订阅) -->
            <property name="pubSubDomain" value="true" />
            <!-- 订阅消息持久化 -->
            <property name="deliveryPersistent" value="true" />
            <!-- 配置持久化,同上 deliveryPersistent
            <property name="deliveryMode" value="2" />
            -->
        </bean>
        
        <!--Spring JmsTemplate 的消息生产者 end-->
    
    </beans>  

      QueueSender.java

    package com.outsideasy.activemq.service;
    
    import java.io.Serializable;
    
    import javax.jms.JMSException;
    import javax.jms.MapMessage;
    import javax.jms.Message;
    import javax.jms.Session;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.stereotype.Component;
    import org.springframework.stereotype.Service;
    
    @Component
    public class QSender {
        //队列名
        private static final String QUEUENAME = "outside.order";
        @Autowired
        @Qualifier("jmsQueueTemplate")
        private JmsTemplate jmsTemplate;//通过@Qualifier修饰符来注入对应的bean
        
        /**
         * 发送一条消息到指定的队列(目标)
         * @params s 序列化对象 和 receiver 中的getObject对应
         */
        public void orderSend(final Serializable s){
                jmsTemplate.send(QUEUENAME, new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    return session.createObjectMessage(s);
                }
            });
        }
    }

      PurchaseOrderService.java

        public void saveAccpetOrderByID(Map<String,Object> params){
            int status = getPurchaseOrderStatus(params);
            if(status ==10){
                params.put("order_status", 20);
                mapper.updateOrderStatus(params);
                purchaseOrderOperatingService.addPurchaseOrderOperating(params);
                //触发PO确认接单
                PurchaseOrderVo purchaseOrderVo = mapper.getOrderDetailsByID(params);
                PurchaseOrderSender sender = new PurchaseOrderSender();//构造发送数据 这里的对象和接受者对象在项目中的包路径相同,否则无法反序列化
                sender.setPur_order_id(purchaseOrderVo.getPur_order_id());
                sender.setOrder_bh(purchaseOrderVo.getOrder_bh());
                sender.setOrder_status(purchaseOrderVo.getOrder_status());
                sender.setAgreement_bh(purchaseOrderVo.getAgreement_bh());
                sender.setR_opreate_dt(purchaseOrderVo.getR_opreate_dt());
                sender.setSource_type(purchaseOrderVo.getSource_type());
                qSender.orderSend(sender);//调用activemq发送消息对象
            }else{
                
            }
        }

      2.2 消费者

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
        xmlns:jms="http://www.springframework.org/schema/jms"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
            http://www.springframework.org/schema/context   
            http://www.springframework.org/schema/context/spring-context-4.0.xsd
            http://www.springframework.org/schema/jms
            http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
            http://activemq.apache.org/schema/core
            http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
    
        <context:property-placeholder location="classpath:sysconfig/activemq.properties" />
        <bean id="qReceiver" class="com.outsideasy.activemq.service.QReceiver"/>
    
        <!-- ActiveMQ 连接工厂 -->
         <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
        <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
        <amq:connectionFactory id="amqConnectionFactory"
            brokerURL="tcp://localhost:61618" userName="admin" password="topsun" /> 
    
        <!-- Spring Caching连接工厂 -->
         <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory 
         org.springframework.jms.connection.CachingConnectionFactory
         org.apache.activemq.pool.PooledConnectionFactory
         -->  
        <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
            <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
              <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
              <!-- 同上,同理 -->
            <!-- <constructor-arg ref="amqConnectionFactory" /> -->
            <!-- Session缓存数量 -->
            <property name="sessionCacheSize" value="100" />
            <!-- 接收者ID,用于Topic订阅者的永久订阅-->
            <property name="clientId" value="client-C" /> 
        </bean>
    
        
        <!-- 消息消费者 start-->
        <!-- 定义Queue监听器 -->
        <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
            <jms:listener destination="outside.order" ref="qReceiver"/>
            <!-- <jms:listener destination="test.queue" ref="queueReceiver2"/> -->
        </jms:listener-container>
        
        <!-- 定义Topic监听器 -->
        <jms:listener-container destination-type="durableTopic" container-type="default" connection-factory="connectionFactory" acknowledge="auto" client-id="client-C">
            <!-- 注意:定义 subscription(即:durableSubscriptionName)持久化主题名字 -->
            <!-- <jms:listener destination="test.topic" subscription="topic_receiver1" ref="topicReceiver1"/>
            <jms:listener destination="test.topic" subscription="topic_receiver2" ref="topicReceiver2"/> -->
        </jms:listener-container>
        
        <!-- 消息消费者 end -->
    </beans>  

      QReceiver.java

    package com.outsideasy.activemq.service;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.ObjectMessage;
    
    import org.springframework.stereotype.Component;
    
    import com.outsideasy.activemq.model.PurchaseOrderSender;
    
    @Component
    public class QReceiver implements MessageListener{
    
        @Override
        public void onMessage(Message message) {
            try {
                /*System.out.println("QueueReceiver2接收到消息:"+((TextMessage)message).getText());*/
                ObjectMessage objmsg=(ObjectMessage) message;
                PurchaseOrderSender purchaseOrderSender=(PurchaseOrderSender)objmsg.getObject(); //此消息对象和发送者的对象路径相同
                System.out.println("QueueReceiver1接收到消息:"+purchaseOrderVo.getPur_order_id());
                System.out.println("****:"+purchaseOrderVo.toString());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
    }

    3. 注意事项

      3.1. 序列化与反序列化问题;

        传输为对象类型为对象:

          生产者和消费者对象的包路径必须一致,否则无法反序列化。

        传输为Map类型:

          value值必须为Integer 、Double 、String...自定义对象无法作为Value传输。否则提示如下错误:

          Only objectified primitive objects, String, Map and List types are allowed

      3.2.权限配置

        3.2.1 服务访问权限及账号密码设置

        在%ACTIVEMQ_HOME%conf中activemq.xml中<xml 元素参见:ActiveMQ Xml Reference>

    <broker ...>
            <plugins>
                <simpleAuthenticationPlugin>
                    <users>         
                        <authenticationUser username="admin" password="topsun"
                        groups="admins,publishers,consumers"/>
                        
                        <authenticationUser username="publisher" password="topsun"
                        groups="publishers,consumers"/>
                        
                        <authenticationUser username="consumer" password="topsun"
                        groups="consumers"/>
                        
                        <authenticationUser username="guest" password="topsun"
                        groups="guests"/>
                    </users>
                </simpleAuthenticationPlugin>
                
                <authorizationPlugin>
                    <map>
                        <authorizationMap>
                            <authorizationEntries>
                              <authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
                              <authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
                              <authorizationEntry queue="publishers.>" read="publishers" write="publishers" admin="admins" />
                              <authorizationEntry queue="consumers." read="consumers" write="consumers" admin="admins" /> 
                              <authorizationEntry queue="test" read="guests" write="guests" />
                            </authorizationEntries>
                        </authorizationMap>
                    </map>
                </authorizationPlugin>                      
            </plugins>
    
    </broker>

      

        3.2.2 ActiveMQ 服务网页控制台(web console)账号密码设置

          需要在%ACTIVEMQ_HOME%in中jetty-realm.properties中配置,如下:  

    ## ---------------------------------------------------------------------------
    ## Licensed to the Apache Software Foundation (ASF) under one or more
    ## contributor license agreements.  See the NOTICE file distributed with
    ## this work for additional information regarding copyright ownership.
    ## The ASF licenses this file to You under the Apache License, Version 2.0
    ## (the "License"); you may not use this file except in compliance with
    ## the License.  You may obtain a copy of the License at
    ## 
    ## http://www.apache.org/licenses/LICENSE-2.0
    ## 
    ## Unless required by applicable law or agreed to in writing, software
    ## distributed under the License is distributed on an "AS IS" BASIS,
    ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    ## See the License for the specific language governing permissions and
    ## limitations under the License.
    ## ---------------------------------------------------------------------------
    
    # Defines users that can access the web (console, demo, etc.)
    # username: password [,rolename ...]
    admin: admin, admin
    user: user, user

      3.3 clientId 标签值

        每个客户端只能在此服务中,只能使用唯一标识值。重复使用会报错。

      参考资料:

        1. http://www.cnblogs.com/kszit/p/3596366.html

        2. http://shmilyaw-hotmail-com.iteye.com/blog/1897635

        3. http://www.cnblogs.com/hoojo/p/active_mq_jms_apache_activeMQ.html

      

  • 相关阅读:
    Web API 强势入门指南
    毫秒必争,前端网页性能最佳实践
    Windbg Extension NetExt 使用指南 【3】 ---- 挖掘你想要的数据 Managed Heap
    Windbg Extension NetExt 使用指南 【2】 ---- NetExt 的基本命令介绍
    Windbg Extension NetExt 使用指南 【1】 ---- NetExt 介绍
    WCF : 修复 Security settings for this service require Windows Authentication but it is not enabled for the IIS application that hosts this service 问题
    透过WinDBG的视角看String
    Microsoft Azure Web Sites应用与实践【4】—— Microsoft Azure网站的“后门”
    企业IT管理员IE11升级指南【17】—— F12 开发者工具
    WCF : 如何将NetTcpBinding寄宿在IIS7上
  • 原文地址:https://www.cnblogs.com/springlight/p/6433862.html
Copyright © 2011-2022 走看看