zoukankan      html  css  js  c++  java
  • ActiveMQ高级特性

    一、常用配置属性

      以下配置文件目录均为:${activemq_home}/conf/activemq.xml

      1、定期扫描清理

         ActiveMQ中有一项功能:Delete Inactive Destination。可以处理 “ 没有消费者且未处理的Destination”,也就是 queue 或者 topic 在规定时间内,没有入队记录或者有效订阅,会被清理删除。

        下面基于Queue的配置,Topic的配置类似。

         

        其中属性定义:schedulePeriodForDestinationPurge - 必填。声明扫描闲置队列的周期,单位毫秒。默认值0,为不扫描。

               gcInactiveDestinations - 必填。针对Destination的配置,声明Broker扫描闲置队列时,是否扫描此队列。默认值false

               inactiveTimoutBeforeGC - 选填。配合gcInactiveDestinations=true时才生效。声明Destination闲置多久可以进行删除,单位毫秒。默认值60。

      2、存储空间设置

         

        其中的配置标签: memoryUsage - 表示ActiveMQ使用的内存。这个值必须大于等于destinationPolicy中设置的所有Destination的内存之和。

                storeUsage - 表示持久化存储文件的大小

                tempUsage - 非持久化消息的临时内存大小

      3、kahadb方式的持久化的刷盘方式

       

        其中属性定义:journalDiskSyncStrategy - 声明持久化方式,可选值always :periodic :never 。默认always

               journalDiskSyncInterval - 声明数据落盘的时间间隔。单位毫秒

      4、JMX配置使用

         broker标签增加:

        

        managementContext标签修改:

        

      5、消息过期时间

       

        其中标签及属性定义:timeStampingBrokerPlugin - 为持久化的消息设置过期时间,zeroExpirationOverride 为没有设置有效时间的消息设置过期时间。

                    ttlCeiling 表示过期时间上线,如果程序中设置的时间超过这个值,以此值为准。

                  expireMessagesPeriod -  声明扫描过期消息的时间间隔,单位毫秒。

                  queue=">" 、topic=">" 表示匹配所有的Destination

                  queue="tp_>" 、topic="tp_>"  表示匹配所有的 tp_ 开头的Destination

    二、消息的持久化

      ActiveMQ的消息,Queue中的消息会根据配置的不同实现持久化,Topic如果不存在持久化订阅者的话会直接丢弃消息。

      持久化的配置,在${activemq_home}/conf/activemq.xml配置文件中,标签 broker内部的 persistenceAdapter 中配置持久化方案

      1、kahadb

       5.4版本之后默认的持久化方式。文件型的数据库存储,配置如下:

    <persistenceAdapter>
    <!-- directory:保存数据的目录; journalMaxFileLength:保存消息的文件大小 -->
    <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/>
    </persistenceAdapter>
    

      2、JDBC

        ActiveMQ将消息持久化保存到数据库中,没有限定使用某一个数据库。

        配置成功后,需要在数据库中建立相应的database,后续ActiveMQ启动才能正常访问。ActiveMQ正常启动后,能够自动的创建相应的数据表:

        activemq_msgs - 用于存储消息的表

        activemq_acks - 存储订阅关系

        activemq_lock - 集群环境中使用,同一时间只能有一个Master Broker

        下文配置中以MySQL为例:

        2.1、定义数据源bean(与broker同级)

    <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
      <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
      <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
      <property name="username" value="activemq"/>
      <property name="password" value="activemq"/>
      <!-- 属性雷同可根据具体连接池配置 -->   <property name="maxActive" value="200"/>   <property name="poolPreparedStatements" value="true"/> </bean>

      其中数据连接池可以任意其他产品,只需要将相应的jar包拷贝到 ${activemq_home}/lib 目录下即可。

        2.2、配置 persistenceAdapter 标签

    <persistenceAdapter>
        <!-- createTablesOnStartup标签是否启动时创建数据表,默认值为true。一般第一次启动的时候true,后续修改为false --> 
        <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/>
    </persistenceAdapter>
    

     

    三、ActiveMQ的桥接模式 (network)

      ${activemq_home}/conf/activemq.xml 配置文件中,broker标签中,增加如下:

    <networkConnectors>
     <networkConnectoruri="static://(tcp://ip:61616)"/>
    </networkConnector>
    

      在两个ActiveMQ应用中,分别配置对方的地址信息,则两个Broker就通过一个static协议进行网络连接。一个consumer连接到Broker1上目的地A,当producer发送到Broker2上的相同目的地A的时候,消息会转发到Broker1上面供消费使用。

    四、虚拟Topic(Virtual Topic)

      1、Topic的两种订阅模式

        以下测试。为了能够清晰的查看消息是否持久化,可以配置成JDBC方式的持久化方案,数据库查看。

        消息生产者

    package com.cfang.prebo.activemq.topic;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.Topic;
    import javax.jms.TopicPublisher;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Producer {
    
    	private static String brokerURL = "tcp://localhost:61618";
    	private static ConnectionFactory connectionFactory = null;
    	private static Connection connection = null;
    	private static Session session = null;
    	private static MessageProducer messageProducer = null;
    	
    	public static void main(String[] args) throws JMSException {
    		connectionFactory = new ActiveMQConnectionFactory(brokerURL);
    		connection = connectionFactory.createConnection();
    		connection.start();
    		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		Destination destination = session.createTopic("TP_TEST_03");
    		messageProducer = session.createProducer(destination);
    		Message message = session.createTextMessage("test01");
    		messageProducer.send(message);
    		connection.close();
    	}
    }

        1.1、普通订阅

          Topic不会持久化消息,如果不存在订阅关系,则Topic会直接丢弃消息,后续再订阅也不会收到之前的任何消息。

    package com.cfang.prebo.activemq.topic;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.Topic;
    import javax.jms.TopicSubscriber;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * @author cfang
     *	普通订阅
     */
    public class SimpleSubscribe {
    
    	private static String brokerURL = "tcp://localhost:61618";
    	private static ConnectionFactory connectionFactory = null;
    	private static Connection connection = null;
    	private static Session session = null;
    	
    	public static void main(String[] args) throws JMSException {
    		connectionFactory = new ActiveMQConnectionFactory(brokerURL);
    		connection = connectionFactory.createConnection();
    		connection.start();
    		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		Destination topic = session.createTopic("TP_TEST_03");
    		MessageConsumer consumer = session.createConsumer(topic);
    		consumer.setMessageListener(new MessageListener() {
    			public void onMessage(Message message) {
    				System.out.println(message);
    			}
    		});
    	}
    }
    

      生产者发送消息,数据库不保存:

        1.2、持久化订阅

        持久化订阅者一定得预先加载一次,也就是向broker注册这个消费订阅者。之后,无论订阅者是否在线,最终都会收到消息。不在线的话,等到再次连接的时候,会将没有受过的消息全部接收处理。

    package com.cfang.prebo.activemq.topic;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.Topic;
    import javax.jms.TopicSubscriber;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * @author cfang
     *	持久订阅
     */
    public class DurableSubscribe {
    	
    	private static String brokerURL = "tcp://localhost:61618";
    	private static ConnectionFactory connectionFactory = null;
    	private static Connection connection = null;
    	private static Session session = null;
    
    	public static void main(String[] args) throws JMSException {
    		connectionFactory = new ActiveMQConnectionFactory(brokerURL);
    		connection = connectionFactory.createConnection();
    		connection.setClientID("test01");
    		connection.start();
    		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		Topic topic = session.createTopic("TP_TEST_03");
    		TopicSubscriber consumer = session.createDurableSubscriber(topic, "Test-subscriber");
    		consumer.setMessageListener(new MessageListener() {
    			public void onMessage(Message message) {
    				System.out.println(message);
    			}
    		});
    	}
    }
    

      运行上面订阅程序,然后关闭程序,订阅关系会保存到表 activemq_acks 中:

      

      发送消息,会持久化到表 activemq_msgs中,消息会在持久化订阅者在线的时候供消费使用:

      

        1.3、离线的持久订阅的清理

      

      上述配置说明:broker每小时检查并删除离线1天以上的持久订阅者。  

      其中标签、属性定义: offlineDurableSubscriberTimeout -  删除的离线的持久订阅者时间,单位毫秒,默认 -1

                 offlineDurableSubscriberTaskSchedule - 声明扫描时间间隔,单位毫秒。

      2、虚拟Topic的应用

        Virtual Topic,对生产者来说是个topic,对消费者来说是个queue。内部处理机制是由Broker将接收到的消息进行二次分发到每个queue,然后不同的queue对应不同的应用实现持久化,不同的消费端只需要关注自己的queue就可以了。这样,对每个queue可以做应用内可以做failover处理,也可达到了对topic的多consumer共同处理。

        下面是简单的代码测试

    package com.cfang.prebo.activemq.topic;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    /**
     * 	VirtualTopic生产者
     */
    public class VirtualProducer {
    
    	public static void main(String[] args) throws JMSException {
    		
    		ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61618");
    		Connection connection = factory.createConnection();
    		connection.start();
    		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		Destination destination = session.createTopic("VirtualTopic.TP");
    		MessageProducer producer = session.createProducer(destination);
    		producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    		Message message = session.createTextMessage("hello world");
    		message.setStringProperty("keys", "value-bak");
    		producer.send(message);
    		session.close();
    		connection.close();
    	}
    }

      

    package com.cfang.prebo.activemq.topic;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    /**
     * VirtualTopic 消费端
     *
     */
    public class VirtualConsumer {
    
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61618");
    		Connection connection = factory.createConnection();
    		connection.start();
    		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		Destination destinationA = session.createQueue("Consumer.A.VirtualTopic.TP");
    		Destination destinationB = session.createQueue("Consumer.B.VirtualTopic.TP");
    		
    		MessageConsumer consumerA1 = session.createConsumer(destinationA);
    		consumerA1.setMessageListener(new MessageListener() {
    			public void onMessage(Message message) {
    				System.out.println("A1:"+message);
    			}
    		});
    		MessageConsumer consumerA2 = session.createConsumer(destinationA);
    		consumerA2.setMessageListener(new MessageListener() {
    			public void onMessage(Message message) {
    				System.out.println("A2:"+message);
    			}
    		});
    		
    		MessageConsumer consumerB = session.createConsumer(destinationB);
    		consumerB.setMessageListener(new MessageListener() {
    			public void onMessage(Message message) {
    				System.out.println("B:"+message);
    			}
    		});
    		System.in.read();
    	}
    }
    

      其中,消费端的A1和A2为一个应用,组成内部负载和failover。

      VirtualTopic的消费端queue名称的前后缀可通过配置文件${activemq_home}/conf/activemq.xml修改,一般也不需要修改前后缀,极少应用

      在 broker标签中,加入以下内容,则前缀就修改成了VT,之前例子中的消费端,监听的queue名称就改为 “VT.A.VirtualTopic.TP” 和 “VT.B.VirtualTopic.TP”

      selectorAware 属性表明 consumer 有selector 的话,则对消息进行过滤,只有符合selector的消息才会分发到queue中。

    <destinationInterceptors>
    			<virtualDestinationInterceptor>
    				<virtualDestinations>
    					<virtualTopic name=">" prefix="VT.*." selectorAware="false"/>
    				</virtualDestinations>
    			</virtualDestinationInterceptor>
    		</destinationInterceptors>
    

      

      总结点VirtualTopic:

        1、虚拟Topic是一种特殊命名的Topic,系统根据命名规则将该Topic内的消息分发给当前存在的名称对应的Queue,分发是非持久化的,新加入的Queue是接收不到过去的消息的。

        2、虚拟Topic的功能完全是中间件本身额外附加的机制,对于生产者和消费者都是无感知的。

        3、虚拟Topic是非持久化的,不存在积压。
     
  • 相关阅读:
    linux卸载mysql
    【Linux/Oracle】ORA-00031:标记要终止的会话 解决
    linux上 oracle数据库的密码过期-解决
    SYSTEM表空间满,解决方法
    Oracle 撤回已经提交的事务
    关于MySQL取不到数据
    解决mysql只能通过localhost而不能使用本机ip访问的问题
    linux 卸载php,史上最详情
    关于支付宝免密代扣申请(全),小程序,公众号,php
    mcrypt_module_open 微信小程序 php7不能使用的问题
  • 原文地址:https://www.cnblogs.com/eric-fang/p/11339828.html
Copyright © 2011-2022 走看看