zoukankan      html  css  js  c++  java
  • Spring下ActiveMQ实战

        MessageQueue是分布式的系统里经常要用到的组件,一般来说,当需要把消息跨网段、跨集群的分发出去,就可以用这个。一些典型的示例就是:

        1、集群A中的消息需要发送给多个机器共享;

        2、集群A中消息需要主动推送,但彼此的网络不是互通的(如集群A只有过HA才能被外界访问);

        

        当然上面的几个点,除了用MQ还有其它实现方式,但是MQ无疑是非常适合用来做这些事的。众多MQ中,ActiveMQ是比较有名气也很稳定的,它发送消息的成本非常廉价,支持Queue与Topic两种消息机制。本文主要就是讲如何在Spring环境下配置此MQ:

    1、场景假设

        现有机器两台Server、Worker需要进行异步通信,另有一台ActiveMQ机器,关于MQ的配置信息存放在Zookeeper中,Zookeeper的节点有:

          - /mq/activemq/ip:mq的机器ip

          -/mq/activemq/port:这是mq的机器端口

    2、Server的Spring XML配置

        Server主要的工作就是接受Worker消息,并发送消息给Worker。主要是定义了连接MQ的连接池接受Worker消息的队列worker,发送消息给Worker的队列server:

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="
     3         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
     4         http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.1.xsd">
     5 
     6     <!-- ActiveMQ连接池 -->
     7     <bean id="conFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
     8         <property name="connectionFactory">
     9             <bean class="org.apache.activemq.ActiveMQConnectionFactory">
    10                 <property name="brokerURL">
    11                     <bean class="lekko.mq.util.MQPropertiesFactory" factory-method="getUrl" />
    12                 </property>
    13                 <property name="closeTimeout" value="60000" />
    14                 <!-- <property name="userName" value="admin" /> -->
    15                 <!-- <property name="password" value="admin" /> -->
    16                 <!-- <property name="optimizeAcknowledge" value="true" /> -->
    17                 <property name="optimizedAckScheduledAckInterval" value="10000" />
    18             </bean>
    19         </property>
    20     </bean>
    21 
    22 
    23     <!-- Worker任务消息 -->
    24     <bean id="taskWorkerTopic" class="org.apache.activemq.command.ActiveMQTopic">
    25         <constructor-arg value="worker_topic" />
    26     </bean>
    27     <!-- 任务监听容器 -->
    28     <bean id="taskWorkerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    29         <property name="connectionFactory" ref="conFactory" />
    30         <property name="destination" ref="taskWorkerTopic" />
    31         <property name="messageListener">
    32             <bean class="lekko.mq.task.TaskWorkerListener" />
    33         </property>
    34         <property name="pubSubDomain" value="true" />
    35     </bean>
    36 
    37 
    38     <!-- Server任务消息 -->
    39     <bean id="taskServerTopic" class="org.apache.activemq.command.ActiveMQTopic">
    40         <constructor-arg value="server_topic" />
    41     </bean>    
    42     <!-- 任务消息发送模板 -->
    43     <bean id="taskServerTemplate" class="org.springframework.jms.core.JmsTemplate" p:connectionFactory-ref="conFactory" p:defaultDestination-ref="taskServerTopic" />
    44 
    45 </beans>

        一段一段地分析,ActiveMQ连接池这里,定义了连接的bean为“conFactory”,其中broberURL属性是通过后台Java代码的静态方法来设置的,方便线上环境通过Java代码动态地切换,稍后会介绍这块代码,你现在需要知道的是,它实际上返回的就是一个字符串,格式像:tcp://xxx.xxx.xxx.xxx:port,如果不要用后台来管理连接信息,直接改成“<property name="brokerURL" value="tcp://xxx.xxx.xxx.xxx:port">”也是OK的。

        接下来,便是Worker消息队列的定义,这里定义为“taskWorkerTopic”,类型是org.apache.activemq.command.ActiveMQTopic,(订阅模式)它表示一个消息可以被多个机器收到并处理,其它的还有org.apache.activemq.command.ActiveMQQueue,(点对点模式)表示一个消息只能被一台机器收到,当收到后消息就出队列了,其它机器无法处理。它们都有一个构造参数constructor-arg,指定了消息队列的名称,一个MQ中一个消息队列的名字是唯一的。

        Worker的消息队列定义好了之后,就是接受Worker的里消息了,这里定义了“taskWorkerContainer”,其属性分别定义了连接池、目标队列、消息处理器(我们自己的Java类,后面再讲),参数pubSubDomain用于指定是使用订阅模式还是使用点对点模式,如果是ActiveMQTopic则要设置为true,默认是false。

        好了,Server现在已经可以通过自己定义的“lekko.mq.task.TaskWorkerListener”类接受并处理taskWorkerTopic的消息了。

        如法炮制,定义一个专门用于往Worker里发消息的队列“taskServerTopic”,并定义发送消息的模板“taskServerTemplate”备用。

    3、Server端的接收类与发送类

        lekko.mq.task.TaskWorkerListener便是一个接收类示例:

     1 package lekko.mq.task;
     2 
     3 import javax.jms.Message;
     4 import javax.jms.MessageListener;
     5 
     6 import org.apache.activemq.command.ActiveMQObjectMessage;
     7 import org.apache.log4j.Logger;
     8 import org.springframework.stereotype.Service;
     9 import lekko.mq.model.MessageModel;
    10 
    11 
    12 /**
    13  * Task消息监听类
    14  * @author lekko
    15  */
    16 @Service
    17 public class TaskWorkerListener implements MessageListener {
    18 
    19     private Logger _logger = Logger.getLogger(TaskWorkerListener.class);
    20 
    21     @Override
    22     public void onMessage(Message message) {
    23         if (message instanceof ActiveMQObjectMessage) {
    24             ActiveMQObjectMessage aMsg = (ActiveMQObjectMessage) message;
    25             try {
    26                 onMessage((MessageModel) aMsg.getObject());
    27             } catch (Exception e) {
    28                 _logger.warn("Message:${} is not a instance of MessageModel.", e);
    29             }
    30         } else {
    31             _logger.warn("Message:${} is not a instance of ActiveMQObjectMessage.");
    32         }
    33     }
    34 
    35     /**
    36      * 处理消息
    37      * @param message 自定义消息实体
    38      */
    39     public void onMessage(MessageModel message) { ... }
    40 
    41 }

        这里给大家演示的并不是最基础的知识,处理的消息是一个自定义的类“lekko.mq.model.MessageModel”,这个类怎么写可以随便整,反正就是一些你要传递的数据字段,但是记得要实现Serializable接口。如果你需要传递的仅仅是纯字符串,那么直接在代码的23行片,把message.toString()即可。这个类通过前面XML配置会处理来自“worker_topic”队列中的消息。

        

        再就是发送类,实际上就是把前面的taskServiceTemplate拿来用就行了:

     1 package lekko.mq.task;
     2 
     3 import org.springframework.beans.factory.annotation.Autowired;
     4 import org.springframework.beans.factory.annotation.Qualifier;
     5 import org.springframework.jms.core.JmsTemplate;
     6 import org.springframework.stereotype.Service;
     7 import lekko.mq.model.MessageModel;
     8 
     9 
    10 /**
    11  * 服务器任务消息分发
    12  * @author lekko
    13  */
    14 @Service
    15 public class TaskServerSender {
    16 
    17     @Autowired
    18     @Qualifier("taskServerTemplate")
    19     private JmsTemplate jmsTemplate;
    20 
    21     /**
    22      * 发送消息
    23      */
    24     public void sendMessage(MessageModel msg) {
    25         jmsTemplate.convertAndSend(msg);
    26     }
    27 
    28 }

        把这个类TaskServerSender注入到任意需要用到的地方,调用sendMessage方法即可。它会往前面定义的“server_topic”中塞消息,等Worker来取。

    4、关于Zookeeper配置MQ连接信息

        Worker端的配置我这里不再阐述,因为它跟在Server端的配置太相像,区别就在于Server端是从worker_topic中取消息,往server_topic中写消息;而Worker端的代码则是反过来,往worker_topic中写消息,从server_topic中取消息。

        那么如何使用Java代码来控制ActiveMQ的配置消息呢:

     1 package lekko.mq.util;
     2 
     3 import org.apache.zookeeper.ZooKeeper;
     4 import org.apache.zookeeper.data.Stat;
     5 
     6 /**
     7  * 获取MQ配置
     8  * @author lekkoli
     9  */
    10 public class MQPropertiesFactory {
    11     
    12     private static boolean isLoaded = false;
    13     private static String ZOOKEEPER_CLUST = "xxx.xxx.xxx.xxx:2181";
    14     private static ZooKeeper _zk;
    15     private static String _ip;
    16     private static String _port;
    17 
    18     private static String getProperty(String path) throws Exception {
    19         if (_zk == null) {
    20             if (ZOOKEEPER_CLUST == null) {
    21                 throw new Exception("Zookeeper, Host "" + ZOOKEEPER_CLUST + "" is null!");
    22             }
    23             _zk = new ZooKeeper(ZOOKEEPER_CLUST, 90000, null);
    24         }
    25         Stat s = _zk.exists(path, false);
    26         if (s != null)
    27             return new String(_zk.getData(path, false, s));
    28         throw new Exception("Zookeeper, Path "" + path + "" is not exist!");
    29     }
    30 
    31     private static void load() throws Exception {
    32         if (!isLoaded) {
    33             _ip = getProperty("/mq/activemq/ip");
    34             _port = getProperty("/mq/activemq/port");
    35             isLoaded = true;
    36         }
    37     }
    38 
    39     public static String getUrl() throws Exception {
    40         load();
    41         StringBuilder failover = new StringBuilder();
    42         String[] ips = _ip.split(";"), ports = _port.split(";");
    43         for (int i = 0; i < ips.length; ++i) {
    44             failover.append("tcp://").append(ips[i]).append(":").append(ports[i]).append(",");
    45         }
    46         failover.setLength(failover.length() - 1);
    47         String failovers = failover.toString();
    48         if (ips.length > 1) {
    49             failovers = "failover:(" + failovers + ")";
    50         }
    51         return failovers;
    52     }
    53 }

        上面的代码需要解释的地方跟MQ相关的不多,主要就是如果是mq集群,则格式是:failover:(tcp://192.168.1.117:1001,tcp://192.168.1.118:1001,tcp://xxx.xxx.xxx.xxx:port)。其它上面代码没有对Zookeeper集群都挂了的情况,做应急连接方案。当然,无论如何本节都不是全文的重点,但是多学一技何尝不可?

        最近工作越来越忙,更新博客也是时有时无,但是我会坚持下去,还有许多工作中的点滴,在这里沉淀一下,也希望更进一步吧。

        转载请注明原址:http://www.cnblogs.com/lekko/p/4940976.html 

  • 相关阅读:
    将一个全是字母的字符串转化为大写
    将一个全是字母,以0结尾的字符串,转化为大写
    call指令和ret指令的配合使用
    将一个全是字母,以0结尾的字符串,转化为大写
    《那些年啊,那些事——一个程序员的奋斗史》——45
    《那些年啊,那些事——一个程序员的奋斗史》——46
    《那些年啊,那些事——一个程序员的奋斗史》——44
    《那些年啊,那些事——一个程序员的奋斗史》——44
    《那些年啊,那些事——一个程序员的奋斗史》——44
    《那些年啊,那些事——一个程序员的奋斗史》——43
  • 原文地址:https://www.cnblogs.com/lekko/p/4940976.html
Copyright © 2011-2022 走看看