zoukankan      html  css  js  c++  java
  • 九、ActiveMQ的消息存储和持久化

    一、介绍

    1.1、此处持久化和之前的持久化的区别

     

      MQ高可用:事务、可持久、签收,是属于MQ自身特性,自带的是MQ自身。这里的持久化是外力,是外部插件。之前讲的持久化是MQ的外在表现,现在讲的的持久是是底层实现。

    1.2、概述

      官网文档:http://activemq.apache.org/persistence

      持久化是什么?一句话就是:ActiveMQ宕机了,消息不会丢失的机制。

      说明:为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一半都会采用持久化机制。ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等。再试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试尝试发送消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有,则会先把存储位置中的消息发出去。

    二、持久化分类

    2.1、AMQ Message Store

      基于文件的存储机制,是以前的默认机制,现在不再使用。

      AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储再一个个文件中,文件的默认大小为32M,当一个文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本

    2.1、kahaDB

      现在默认的。下面我们再详细介绍。

    2.3、JDBC消息存储

      下面我们再详细介绍。

    2.4、LevelDB消息存储

      这种文件系统是ActiveMQ5.8之后引进的,它和kahaDB非常相似,也是基于文件的本地数据库存储形式,但是它提供了比kahaDB更快的持久性。

      但它不使用自定义B-Tree实现来索引预写日志,而是使用基于LevelDB的索引

       <persistenceAdapter>
             <levelDB directory="activemq-data"/>
       </persistenceAdapter>

    2.5、JDBC Message Store with ActiveMQ Journal

      下面我们再详细介绍。

    三、kahaDB消息存储

    3.1、介绍

      基于日志文件从ActiveMQ5.4(含)开始默认的持久化插件 官网文档:http://activemq.aache.org/kahadb 官网上还有一些其他配置参数。

      配置文件activemq.xml中

       <persistenceAdapter>
             <kahaDB directory="${activemq.data}/kahadb"/>
       </persistenceAdapter>

      日志文件的存储目录在:%activemq安装目录%/data/kahadb

    3.2、说明

      KahaBD是目前默认的存储方式,可用于任何场景,提高了性能和恢复能力。消息存储使用一个事务日志和仅仅用一个索引文件来存储文件它所有的地址。

      KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行优化。数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件就会丢失。

    [root@centos64 activemq5.16]#  ll /opt/model/activemq5.16/data/kahadb/
    total 300
    -rw-r--r--. 1 root root 33554432 Jan 17 11:47 db-1.log
    -rw-r--r--. 1 root root   167936 Jan 17 18:48 db.data
    -rw-r--r--. 1 root root    94384 Jan 17 18:48 db.redo
    -rw-r--r--. 1 root root        8 Jan 17 11:47 lock

    3.3、KahaDB的存储原理

      kahaDB在消息保存目录中只有4类文件和一个lock,跟ActiveMQ的其他几种文件存储引擎相比先的非常简洁。

    1. db-<Number>.log KahaDB存储消息到预定大小的数据记录文件中,文件命名为db<Number>.log。当数据文件已满时,一个新的文件随之创建,number数据也会随之递增,它随着消息数量的增多,如每32M一个文件,文件名按照数字进行编号,如db-1.log、db-2.log、db-3.log...。当不再用引用到数据文件中的任何消息时,文件会被删除或归档。
    2. db.data该文件包含了持久化的BTree索引,索引了消息数据记录中的消息,它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-<Number>.log里面存储的消息。

    3. db.free当前db.data文件里哪些页面是空闲,文件具体内容是所有空闲页的ID

    4. db.redo用来进行消息恢复,如果kahaDB消息存储在强制退出后启动,用于恢复BTree索引。

    5. lock文件锁,表示当前获取的kahaDB读写权限是Broker。

    四、JDBC消息存储

    4.1、设置步骤

    • 原理图 

    • 添加mysql数据库的驱动包到lib文件夹

    • jdbcPersistenceAdapter配置

      修改${activeMQ}/conf/activemq.xml配置文件。

    修改前kahaBD修改后jdbcPersistenceAdapter

    <persistenceAdapter>

       <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>

    <persistenceAdapter>

       <jdbcPersistenceAdapter dataSource="#mysql-ds"               createTableOnStartup="true"/>

    </persistenceAdapter>

      dataSource指定将要引用的持久化数据库的bean名称,
    createTablesOnStatrtup是否在启动的时候创建数据表,默认值是true。这样每次启动都会去创建数据表了,

    一般是第一次启动的时候设置为true之后该成false。

       修改配置文件activemq.xml。将之前的替换为jdbc的配置。如下:

    <persistenceAdapter>  
      <jdbcPersistenceAdapter dataSource="#mysql-ds" createTableOnStartup="true"/> 
    </persistenceAdapter>
    • 数据库连接池配置

      需要我们准备一个mysql数据库,并创建一个名为activemq的数据库。在</broker>标签和<import>标签之间插入数据库连接池配置,具体操作如下:

    </broker>
        <bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
            <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
            <property name="url" value="jdbc:mysql://mysql数据库URL/activemq?relaxAutoCommit=true"/>
            <property name="username" value="mysql数据库用户名"/>
            <property name="password" value="mysql数据库密码"/>
            <property name="poolPreparedStatements" value="true"/>
        </bean>
    
     <import resource="jetty.xml"/>

      之后需要建一个数据库,名为activemq。新建的数据库要采用latin1 或者ASCII编码。https://blog.csdn.net/JeremyJiaming/article/details/88734762
      默认是的dbcp数据库连接池,如果要换成其他数据库连接池,需要将该连接池jar包,也放到lib目录下。

    • 建库SQL和创表说明

      重启activemq。会自动生成如下3张表。如果没有自动生成,需要我们手动执行SQL。我个人建议要自动生成,我在操作过程中查看日志文件,发现了不少问题,最终解决了这些问题后,是能够自动生成的。如果不能自动生成说明你的操作有问题。如果实在不行,下面是手动建表的SQL:

    -- 建表SQL
    -- auto-generated definition
    create table ACTIVEMQ_ACKS
    (
        CONTAINER     varchar(250)     not null comment '消息的Destination',
        SUB_DEST      varchar(250)     null comment '如果使用的是Static集群,这个字段会有集群其他系统的信息',
        CLIENT_ID     varchar(250)     not null comment '每个订阅者都必须有一个唯一的客户端ID用以区分',
        SUB_NAME      varchar(250)     not null comment '订阅者名称',
        SELECTOR      varchar(250)     null comment '选择器,可以选择只消费满足条件的消息,条件可以用自定义属性实现,可支持多属性AND和OR操作',
        LAST_ACKED_ID bigint           null comment '记录消费过消息的ID',
        PRIORITY      bigint default 5 not null comment '优先级,默认5',
        XID           varchar(250)     null,
        primary key (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)
    )
        comment '用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存';
    
    create index ACTIVEMQ_ACKS_XIDX
        on ACTIVEMQ_ACKS (XID);
    
     
    -- auto-generated definition
    create table ACTIVEMQ_LOCK
    (
        ID bigint not null primary key,
        TIME  bigint null,
        BROKER_NAME varchar(250) null
    );
    
     
    -- auto-generated definition
    create table ACTIVEMQ_MSGS
    (
        ID bigint not null primary key,
        CONTAINER  varchar(250) not null,
        MSGID_PROD varchar(250) null,
        MSGID_SEQ  bigint       null,
        EXPIRATION bigint       null,
        MSG        blob         null,
        PRIORITY   bigint       null,
        XID        varchar(250) null
    );
    
    create index ACTIVEMQ_MSGS_CIDX
        on ACTIVEMQ_MSGS (CONTAINER);
    
    create index ACTIVEMQ_MSGS_EIDX
        on ACTIVEMQ_MSGS (EXPIRATION);
    
    create index ACTIVEMQ_MSGS_MIDX
        on ACTIVEMQ_MSGS (MSGID_PROD, MSGID_SEQ);
    
    create index ACTIVEMQ_MSGS_PIDX
        on ACTIVEMQ_MSGS (PRIORITY);
    
    create index ACTIVEMQ_MSGS_XIDX
        on ACTIVEMQ_MSGS (XID);

      ACTIVEMQ_MSGS数据表:

      • ID:自增数据库主键。
      • CONTAINER:消息的Destination
      • MSGID_PROD:消息发送者的主键。
      • MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID。
      • EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数。
      • MSG:消息本体的Java序列化对象的二进制数据。
      • PRIORITY:优先级,从0-9,数值越大优先级越高。

      消息表,缺省表名为ACTIVEMQ_MSGS,queue和topic都存在里面,queue消费掉就从表中删除,topic 消费掉还是会保存在表中,结构如下:

    Column name Default type Description
    ID INTEGER the sequence ID used to retrieve the message
    CONTAINER VARCHAR(250) the destination of the message
    MSGID PROD VARCHAR(250) The ID of the message producer

      ACTIVEMQ_ACKS数据表:activemq_acks用于存储订阅关系信息和最后一个持久订阅接收的消息ID,如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存,数据库字段如下

      • CONTAINER:消息的Destination
      • SUB_DEST:如果使用Static集群,这个字段会集群其他系统的信息。
      • CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用以区分。
      • SUB_NAME:订阅者名称
      • SELECTOR:选择器,可以选择只消费满足条件的消息,条件可以使用自定义属性实现,可支持多属性AND 和OR 操作。
      • LAST_ACKED_ID:记录消费过的消息的ID

      ACTIVEMQ_LOCK数据表:

      表activemq_lock在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,其他的只能作为备份等待Master,Broker不可用,才可能成为下一个Master Broker。这个表用于记录哪个Broker是当前的Master Broker。

    Column name Default type Description
    ID INTEGER A unique ID for lock
    Broker Name VARCHAR(250) The Name of the ActiveMQ broker that has the lock

    4.2、 queue验证和数据表变

    • 点对点类型中当,
      • DeliveryMode设置为NON_PERSISTENT时,消息被保存在内存中,不会将消息持久化到数据库
      • DeliveryMode设置为PERSISTENT时,消息被保存在Broker的相应文件或者数据库中。
      • 消息一旦被Consumer消费就会从Broker中删除。
    • 我们使用queue模式持久化,发布3条消息后,发现ACTIVEMQ_MSGS数据表多了3条数据。

    • 启动消费者,消费了所有的消息后,发现数据表的数据消失了。

    • queue模式非持久化,不会持久化消息到数据表。

    4.3、 topic验证和说明

      我们先启动一下持久化topic的消费者。看到ACTIVEMQ_ACKS数据表多了一条消息。

    // 持久化topic 的消息消费者
    public class JmsConsummer_persistence {
        private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
        public static final String TOPIC_NAME = "jdbc-02";
    
        public static void main(String[] args) throws Exception{
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.setClientID("marrry");
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic(TOPIC_NAME);
            TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");
            connection.start();
            Message message = topicSubscriber.receive();
            while (null != message){
                TextMessage textMessage = (TextMessage)message;
                System.out.println(" 收到的持久化 topic :"+textMessage.getText());
                message = topicSubscriber.receive();
            }
            session.close();
            connection.close();
        }
    }

      ACTIVEMQ_ACKS数据表,多了一个消费者的身份信息。一条记录代表:一个持久化topic消费者

      我们启动持久化生产者发布3个数据,ACTIVEMQ_MSGS数据表新增3条数据,消费者消费所有的数据后,ACTIVEMQ_MSGS数据表的数据并没有消失。持久化topic的消息不管是否被消费,是否有消费者,产生的数据永远都存在,且只存储一条。这个是要注意的,持久化的topic大量数据后可能导致性能下降。这里就像公总号一样,消费者消费完后,消息还会保留。

    4.4、总结

    • Queue:在没有消费者消费的情况下会将消息保存到activemq_msgs中,只要有任意一个消费者已经消费过了,消费之后这些消息将会立即被删除。
    • Topic:一般先启动消费者订阅然后再生产的情况下会将消息保存在activemq_acks。

    4.5、开发中避坑

      在配置关系型数据库做为ActiveMQ的持久化方案时,可能遇到的问题

    • 数据库jar包
      记得需要使用到的相关jar文件放置到${activemq_home}/lib目录下。mysql-jdbc驱动的jar包和对应的数据库连接池jar包
    • createTablesOnStatrtup属性
      jdbcPersistenceAdapter标志中设置createTablesOnStatrtup属性为true时在第一次启动ActivaMQ时,activeMQ服务节点会自动创建所需要的数据表,启动完成后可以去掉这个属性,或者更改成false。
    • 下划线
      java.lang.IllegalStateException:BeanFactory not initialized or already closed,这是因为主机名称中有"_"符号,更改主机名称。

    五、JDBC Message Store with ActiveMQ Journal

    5.1、说明

      这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库读库。ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。

      举个例子:生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。如果消费者的速度很慢,这个时候journal文件可以使消息以批量方式写到DB。 为了高性能,这种方式使用日志文件存储+数据库存储。先将消息持久到日志文件,等待一段时间再将未消费的消息持久到数据库。该方式要比JDBC性能要高

    5.2、配置

      下面是基于上面JDBC配置,再做修改activemq.xml:  

    修改前persistenceAdapter修改后persistenceFactory

    <persistenceAdapter>

       <jdbcPersistenceAdapter dataSource="#mysql-ds" createTableOnStartup="true"/>

    </persistenceAdapter>

    <persistenceFactory>
       <journalPersistenceAdapterFactory
            journalLogFiles = "4"
            journalLogFilesSize = "32768"
            useJournal="true"
            useQuickJournal ="true"
            dataSource="#mysql-ds"
            dataDirectory = "activemq/data"
       />
    </persistenceFactory>

    六、总结

      ① jdbc效率低,kahaDB效率高,jdbc+Journal效率较高

      ② 持久化消息主要指的是:MQ所在服务器宕机了消息不会丢试的机制。

      ③ 持久化机制演变的过程:

      从最初的AMQ Message Store方案到ActiveMQ V4版本退出的High Performance Journal(高性能事务支持)附件,并且同步推出了关于关系型数据库的存储方案。ActiveMQ5.3版本又推出了对KahaDB的支持(5.4版本后被作为默认的持久化方案),后来ActiveMQ 5.8版本开始支持LevelDB,到现在5.9提供了标准的Zookeeper+LevelDB集群化方案。

      ④ ActiveMQ消息持久化机制有:

    AMQ基于日志文件
    KahaDB 基于日志文件,从ActiveMQ5.4开始默认使用
    JDBC 基于第三方数据库
    Replicated LevelDB Store 从5.9开始提供了LevelDB和Zookeeper的数据复制方法,用于Master-slave方式的首选数据复制方案。
  • 相关阅读:
    负载均衡机制
    测试先行
    MVC模式在Java Web应用程序中的实例
    MVC模式学习
    Java反射机制
    软件开发火狐自动填写用户名和密码
    23种设计模式概述
    站立会议总结02
    站立会议总结01
    买书最低价问题
  • 原文地址:https://www.cnblogs.com/jdy1022/p/14281135.html
Copyright © 2011-2022 走看看