zoukankan      html  css  js  c++  java
  • ActiveMQ的设置消息时长,事务,确认机制 ,持久化

    转载中------------

    1.消息事务

     
        消息事务是在生产者producer到broker或broker到consumer过程中同一个session中发生的,保证几条消息在发送过程中的原子性。(Broker:消息队列核心,相当于一个控制中心,负责路由消息、保存订阅和连接、消息确认和控制事务)
        在支持事务的session中,producer发送message时在message中带有transactionID。broker收到message后判断是否有transactionID,如果有就把message保存在transaction store中,等待commit或者rollback消息。
     
     
    消息生产者-异步发送

    消息生产者使用持久(persistent)传递模式发送消息的时候,Producer.send() 方法会被阻塞,直到 broker 发送一个确认消息给生产者(ProducerAck),这个确认消息暗示broker已经成功接收到消息并把消息保存到二级存储中。这个过程通常称为同步发送。
    如果应用程序能够容忍一些消息的丢失,那么可以使用异步发送。异步发送不会在受到 broker 的确认之前一直阻塞 Producer.send 方法。但有一个例外,当发送方法在一个事务上下文中时,被阻塞的是 commit 方法而不是 send 方法。commit 方法成功返回意味着所有的持久消息都以被写到二级存储中。
    想要使用异步,在brokerURL中增加 jms.alwaysSyncSend=false&jms.useAsyncSend=true
    如果设置了alwaysSyncSend=true系统将会忽略useAsyncSend设置的值都采用同步
         1) 当alwaysSyncSend=false时,“NON_PERSISTENT”(非持久化)、事务中的消息将使用“异步发送”
         2) 当alwaysSyncSend=false时,如果指定了useAsyncSend=true,“PERSISTENT”类型的消息使用异步发送。如果useAsyncSend=false,“PERSISTENT”类型的消息使用同步发送。
    总结:默认情况(alwaysSyncSend=false,useAsyncSend=false),非持久化消息、事务内的消息均采用异步发送;对于持久化消息采用同步发送。
     jms.sendTimeout:发送超时时间,默认等于0,如果jms.sendTimeout>0将会忽略(alwaysSyncSend、useAsyncSend、消息是否持久化)所有的消息都是用同步发送!
     即使使用异步发送,也可以通过producerWindowSize来控制发送端无节制的向broker发送消息
    producerWindowSize:窗口尺寸,用来约束在异步发送时producer端允许积压的(尚未ACK)的消息的尺寸,且只对异步发送有意义。每次发送消息之后,都将会导致memoryUsage尺寸增加(+message.size),当broker返回producerAck时,如果达到了producerWindowSize上限,即使是异步调用也会被阻塞,防止不停向broker发送消息。
    通过jms.producerWindowSize=。。。来设置
     

    2.消息时长,确认机制 

    消息消费者-消息确认
    1、确认机制(ack_mod)
          AUTO_ACKNOWLEDGE = 1    自动确认
          CLIENT_ACKNOWLEDGE = 2    客户端手动确认   
          DUPS_OK_ACKNOWLEDGE = 3    自动批量确认
          SESSION_TRANSACTED = 0    事务提交并确认
     ACK_MODE描述了Consumer与broker确认消息的方式(时机),比如当消息被Consumer接收之后,Consumer将在何时确认消息。所以ack_mode描述的不是producer于broker之间的关系,而是customer于broker之间的关系。
    对于broker而言,只有接收到ACK指令,才会认为消息被正确的接收或者处理成功了,通过ACK,可以在consumer与Broker之间建立一种简单的“担保”机制.
          session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    第一个参数:是否支持事务,如果为true,则会忽略第二个参数,自动被jms服务器设置为SESSION_TRANSACTED。
    public class TopicPub {  
          public static void main(String[] args) throws JMSException {  
              ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
              Connection connection = factory.createConnection();  
              connection.start();  
      
              Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
              /** 
               Session javax.jms.Connection.createSession(boolean transacted, int acknowledgeMode) throws JMSException 
               1.transacted事务,事务成功commit,才会将消息发送到mom中 
               2.acknowledgeMode消息确认机制 
               1)、带事务的session 
               如果session带有事务,并且事务成功提交,则消息被自动签收。如果事务回滚,则消息会被再次传送。 
               消息事务是在生产者producer到broker或broker到consumer过程中同一个session中发生的, 
               保证几条消息在发送过程中的原子性。 
               在支持事务的session中,producer发送message时在message中带有transactionID。 
               broker收到message后判断是否有transactionID,如果有就把message保存在transaction store中, 
               等待commit或者rollback消息。 
     
               2)、不带事务的session 
               不带事务的session的签收方式,取决于session的配置。 
               Activemq支持一下三種模式: 
               Session.AUTO_ACKNOWLEDGE  消息自动签收 
               Session.CLIENT_ACKNOWLEDGE  客戶端调用acknowledge方法手动签收 
               Session.DUPS_OK_ACKNOWLEDGE 不是必须签收,消息可能会重复发送。在第二次重新传送消息的时候,消息 
               头的JmsDelivered会被置为true标示当前消息已经传送过一次,客户端需要进行消息的重复处理控制。 
               代码示例如下: 
               session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); 
               textMsg.acknowledge(); 
               */  
              Topic topic = session.createTopic("wm5920.topic");  
      
              MessageProducer producer = session.createProducer(topic);  
              producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//设置非持久化  
              //producer.setTimeToLive(5000);//5秒后过期,这个对点对点模式有效  
              TextMessage message = session.createTextMessage();  
              message.setText("message_" + System.currentTimeMillis());  
              producer.send(message);  
              System.out.println("Sent message: " + message.getText());  
              //带有事务得commit  
              //session.commit();  
              session.close();  
              connection.stop();  
              connection.close();  
          }  
      
      }  
    

      订阅主题,注:如果在发布主题前,没有订阅,是收不到消息的,这跟点对点的队列模式不同

    package com.activemq;  
    import org.apache.activemq.ActiveMQConnectionFactory;  
    import javax.jms.*;  
      
      
        public class TopicSubs{  
            public static void main(String[] args) throws JMSException {  
                ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
                Connection connection = factory.createConnection();  
                connection.setClientID("wm5920");  
                connection.start();  
      
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
                Topic topic = session.createTopic("wm5920.topic");  
      
                //持久订阅方式,不会漏掉信息  
                TopicSubscriber subs=session.createDurableSubscriber(topic, "wm5920");  
                subs.setMessageListener(new MessageListener() {  
                    public void onMessage(Message message) {  
                        TextMessage tm = (TextMessage) message;  
                        try {  
                            System.out.println("Received message: " + tm.getText());  
                        } catch (JMSException e) {  
                            e.printStackTrace();  
                        }  
                    }  
                });  
      
                //非持久订阅方式  
    //        MessageConsumer consumer = session.createConsumer(topic);  
    //        consumer.setMessageListener(new MessageListener() {  
    //            public void onMessage(Message message) {  
    //                TextMessage tm = (TextMessage) message;  
    //                try {  
    //                    System.out.println("Received message: " + tm.getText());  
    //                } catch (JMSException e) {  
    //                    e.printStackTrace();  
    //                }  
    //            }  
    //        });  
    //        session.commit();  
    //      session.close();  
    //      connection.stop();  
    //      connection.close();  
            }  
        }  
    

      1.ActiveMQ的几种消息持久化机制
    为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制。
    ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。
    就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试。
    消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需要把消息发送出去。
    >> JDBC持久化方式
    使用JDBC持久化方式,数据库会创建3个表:activemq_msgs,activemq_acks和activemq_lock。
    activemq_msgs用于存储消息,Queue和Topic都存储在这个表中。
    (1)配置方式
    配置持久化的方式,都是修改安装目录下conf/acticvemq.xml文件,
    首先定义一个mysql-ds的MySQL数据源,然后在persistenceAdapter节点中配置jdbcPersistenceAdapter并且引用刚才定义的数据源。

    <persistenceAdapter>  
            <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" />  
    </persistenceAdapter>  
    

      dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改成false。
    使用MySQL配置JDBC持久化:    

    <beans>  
            <broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core">  
                <persistenceAdapter>  
                    <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/>  
                </persistenceAdapter>  
            </broker>  
            <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">  
                <property name="driverClassName" value="com.mysql.jdbc.Driver"/>  
                <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>  
                <property name="username" value="activemq"/>  
                <property name="password" value="activemq"/>  
                <property name="maxActive" value="200"/>  
                <property name="poolPreparedStatements" value="true"/>  
            </bean>  
        </beans>  
    

      (2)数据库表信息
    activemq_msgs用于存储消息,Queue和Topic都存储在这个表中:
    ID:自增的数据库主键
    CONTAINER:消息的Destination
    MSGID_PROD:消息发送者客户端的主键
    MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
    EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数
    MSG:消息本体的Java序列化对象的二进制数据
    PRIORITY:优先级,从0-9,数值越大优先级越高

    activemq_acks用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存:
    主要的数据库字段如下:
        CONTAINER:消息的Destination
        SUB_DEST:如果是使用Static集群,这个字段会有集群其他系统的信息
        CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用以区分
        SUB_NAME:订阅者名称
        SELECTOR:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作
        LAST_ACKED_ID:记录消费过的消息的ID。

        表activemq_lock在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,
        其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。

        这个表用于记录哪个Broker是当前的Master Broker。

    >> AMQ方式

    性能高于JDBC,写入消息时,会将消息写入日志文件,由于是顺序追加写,性能很高。为了提升性能,创建消息主键索引,并且提供缓存机制,进一步提升性能。每个日志文件的大小都是有限制的(默认32m,可自行配置)。
    当超过这个大小,系统会重新建立一个文件。当所有的消息都消费完成,系统会删除这个文件或者归档(取决于配置)。
    主要的缺点是AMQ Message会为每一个Destination创建一个索引,如果使用了大量的Queue,索引文件的大小会占用很多磁盘空间。
     而且由于索引巨大,一旦Broker崩溃,重建索引的速度会非常慢。
     配置片段如下:

    <persistenceAdapter>  
             <amqPersistenceAdapter directory="${activemq.data}/activemq-data" maxFileLength="32mb"/>  
        </persistenceAdapter>  
    

      虽然AMQ性能略高于下面的Kaha DB方式,但是由于其重建索引时间过长,而且索引文件占用磁盘空间过大,所以已经不推荐使用。
    >> KahaDB方式
        KahaDB是从ActiveMQ 5.4开始默认的持久化插件,也是我们项目现在使用的持久化方式。
        KahaDb恢复时间远远小于其前身AMQ并且使用更少的数据文件,所以可以完全代替AMQ。
        kahaDB的持久化机制同样是基于日志文件,索引和缓存。
        配置方式:

    <persistenceAdapter>  
            <kahaDB directory="${activemq.data}/activemq-data" journalMaxFileLength="16mb"/>  
        </persistenceAdapter>  
    

         directory : 指定持久化消息的存储目录
        journalMaxFileLength : 指定保存消息的日志文件大小,具体根据你的实际应用配置  

     (1)KahaDB主要特性
        1、日志形式存储消息;
        2、消息索引以B-Tree结构存储,可以快速更新;
        3、完全支持JMS事务;
        4、支持多种恢复机制;

     (2)KahaDB的结构
        消息存储在基于文件的数据日志中。如果消息发送成功,变标记为可删除的。系统会周期性的清除或者归档日志文件。
        消息文件的位置索引存储在内存中,这样能快速定位到。定期将内存中的消息索引保存到metadata store中,避免大量消息未发送时,消息索引占用过多内存空间。

        Data logs:


        Data logs用于存储消息日志,消息的全部内容都在Data logs中。
        同AMQ一样,一个Data logs文件大小超过规定的最大值,会新建一个文件。同样是文件尾部追加,写入性能很快。
        每个消息在Data logs中有计数引用,所以当一个文件里所有的消息都不需要了,系统会自动删除文件或放入归档文件夹。

        Metadata cache :
        缓存用于存放在线消费者的消息。如果消费者已经快速的消费完成,那么这些消息就不需要再写入磁盘了。
        Btree索引会根据MessageID创建索引,用于快速的查找消息。这个索引同样维护持久化订阅者与Destination的关系,以及每个消费者消费消息的指针。

        Metadata store 
        在db.data文件中保存消息日志中消息的元数据,也是以B-Tree结构存储的,定时从Metadata cache更新数据。Metadata store中也会备份一些在消息日志中存在的信息,这样可以让Broker实例快速启动。
        即便metadata store文件被破坏或者误删除了。broker可以读取Data logs恢复过来,只是速度会相对较慢些。
        >>LevelDB方式

        从ActiveMQ 5.6版本之后,又推出了LevelDB的持久化引擎。
        目前默认的持久化方式仍然是KahaDB,不过LevelDB持久化性能高于KahaDB,可能是以后的趋势。
        在ActiveMQ 5.9版本提供了基于LevelDB和Zookeeper的数据复制方式,用于Master-slave方式的首选数据复制方案。

  • 相关阅读:
    CodeForces Gym 100500A A. Poetry Challenge DFS
    CDOJ 486 Good Morning 傻逼题
    CDOJ 483 Data Structure Problem DFS
    CDOJ 482 Charitable Exchange bfs
    CDOJ 481 Apparent Magnitude 水题
    Codeforces Gym 100637G G. #TheDress 暴力
    Gym 100637F F. The Pool for Lucky Ones 暴力
    Codeforces Gym 100637B B. Lunch 找规律
    Codeforces Gym 100637A A. Nano alarm-clocks 前缀和
    TC SRM 663 div2 B AABB 逆推
  • 原文地址:https://www.cnblogs.com/bug1024/p/9132552.html
Copyright © 2011-2022 走看看