zoukankan      html  css  js  c++  java
  • ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

    个人博客网:https://wushaopei.github.io/    (你想要这里多有)

    一、持久化机制

    1、Activemq持久化

    1.1 什么是持久化:

    持久化就是高可用的机制,即使服务器宕机了,消息也不会丢失

    1.2 持久化的作用

    将MQ 收到的消息存储到文件、硬盘、数据库 等、 则叫MQ 的持久化,这样即使服务器宕机,消息在本地还是有,仍就可以访问到。

    详情——官网 : http://activemq.apache.org/persistence

    1.3 ActiveMQ 支持的消息持久化机制

    • 为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一半都会采用持久化机制。
    • ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。
    • 就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等。再试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试尝试发送。
    • 消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有,则会先把存储位置中的消息发出去。

    注意:一句话:ActiveMQ宕机了,消息不会丢失的机制。

    二、持久化方式

    1、Activemq的持久化方式有几种

    1.1 AMQ Mesage Store(了解)

    AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储再一个个文件中文件的默认大小为32M,当一个文件中的消息已经
    全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本

    注意:基于文件的存储方式,是以前的默认消息存储,现在不用了

    1.2 KahaDB消息存储(默认)

     5.4 之后基于日志文件的持久化插件,默认持久化插件,提高了性能和恢复能力

    KahaDB 的属性配置 : http://activemq.apache.org/kahadb

    说明:它使用一个事务日志和 索引文件来存储所有的地址

    db-<数字>.log 存储数据,一个存满会再次创建 db-2 db-3 …… ,当不会有引用到数据文件的内容时,文件会被删除或归档
    
    db.data 是一个BTree 索引,索引了消息数据记录的消息,是消息索引文件,它作为索引指向了 db-<x>.log 里的消息
    
    一点题外话:就像mysql 数据库,新建一张表,就有这个表对应的 .MYD 文件,作为它的数据文件,就有一个 .MYI 作为索引文件。
    
    db.free 存储空闲页 ID 有时会被清除
    
    db.redo 当 KahaDB 消息存储在强制退出后启动,用于恢复 BTree 索引
    
    lock 顾名思义就是锁
    
    四类文件+一把锁 ==》 KahaDB

    1.3 JDBC消息存储

    消息基于JDBC存储的

    1.4 LevelDB消息存储(了解)

    希望作为以后的存储引擎,5.8 以后引进,也是基于文件的本地数据存储形式,但是比 KahaDB 更快

    它比KahaDB 更快的原因是她不使用BTree 索引,而是使用本身自带的 LeavelDB 索引

    这种文件系统是从ActiveMQ5.8之后引进的,它和KahaDB非常相似,也是基于文件的本地数据库存储形式,但是它提供比KahaDB更快的持久性。
    但它不使用自定义B-Tree实现来索引独写日志,而是使用基于LevelDB的索引

    题外话:为什么LeavelDB 更快,并且5.8 以后就支持,为什么还是默认 KahaDB 引擎,因为 activemq 官网本身没有定论,LeavelDB 之后又出了可复制的LeavelDB 比LeavelDB 更性能更优越,但需要基于 Zookeeper 所以这些官方还没有定论,任就使用 KahaDB

    默认配置如下:

    <persistenceAdapter>
          <levelDB directory="activemq-data"/>
    </persistenceAdapter>

    1.5 JDBC Message Store with ActiveMQ Journal

    2、JDBC存储消息(重点)

    JDBC : 有一部分数据会真实的存储到数据库中 
           使用JDBC 的持久化,

    修改配置文件,默认 kahaDB

    修改之前:

    <persistenceAdapter>
    
           <kahaDB directory="${activemq.data}/kahadb"/>  
    
     </persistenceAdapter>

    修改之后:

    <persistenceAdapter>
    
          <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
    
     </persistenceAdapter>

    在activemq 的lib 目录下添加 jdbc 的jar 包 (connector.jar 我使用5.1.41 版本)

    修改配置文件 : activemq.xml 使其连接自己windows 上的数据库,并在本地创建名为activemq 的数据库

    让linux 上activemq 可以访问到 mysql ,之后产生消息。

    ActiveMQ 启动后会自动在 mysql 的activemq 数据库下创建三张表:activemq_msgs 、activemq_acks、activemq_lock

    activemq_acks:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存
    
    activemq_lock:在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker
    
    activemq_msgs:用于存储消息,Queue和Topic都存储在这个表中

    点对点会在数据库的数据表 ACTIVEMQ_MSGS 中加入消息的数据,且在点对点时,消息被消费就会从数据库中删除

    但是对于主题,订阅方式接受到的消息,会在 ACTIVEMQ_MSGS 存储消息,即使MQ 服务器下线,并在 ACTIVEMQ_ACKS 中存储消费者信息 。 并且存储以 activemq 为主,当activemq 中的消息被删除后,数据库中的也会自动被删除。

    如果表没生成,可能需要自己创建

    -- auto-generated definition
    create table ACTIVEMQ_ACKS
    (
        CONTAINER     varchar(250)     not null comment '消息的Destination',
        SUB_DEST      varchar(250)     null comment '如果使用的是Static集群,这个字段会有集群其他系统的信息',
        CLIENT_ID     varchar(250)     not null comment '每个订阅者都必须有一个唯一的客户端ID用以区分',
        SUB_NAME      varchar(250)     not null comment '订阅者名称',
        SELECTOR      varchar(250)     null comment '选择器,可以选择只消费满足条件的消息,条件可以用自定义属性实现,可支持多属性AND和OR操作',
        LAST_ACKED_ID bigint           null comment '记录消费过消息的ID',
        PRIORITY      bigint default 5 not null comment '优先级,默认5',
        XID           varchar(250)     null,
        primary key (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)
    )
        comment '用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存';
    
    create index ACTIVEMQ_ACKS_XIDX
        on ACTIVEMQ_ACKS (XID);
    
     
    -- auto-generated definition
    create table ACTIVEMQ_LOCK
    (
        ID          bigint       not null
            primary key,
        TIME        bigint       null,
        BROKER_NAME varchar(250) null
    );
    
     
    -- auto-generated definition
    create table ACTIVEMQ_MSGS
    (
        ID         bigint       not null
            primary key,
        CONTAINER  varchar(250) not null,
        MSGID_PROD varchar(250) null,
        MSGID_SEQ  bigint       null,
        EXPIRATION bigint       null,
        MSG        blob         null,
        PRIORITY   bigint       null,
        XID        varchar(250) null
    );
    
    create index ACTIVEMQ_MSGS_CIDX
        on ACTIVEMQ_MSGS (CONTAINER);
    create index ACTIVEMQ_MSGS_EIDX
        on ACTIVEMQ_MSGS (EXPIRATION);
    create index ACTIVEMQ_MSGS_MIDX
        on ACTIVEMQ_MSGS (MSGID_PROD, MSGID_SEQ);
    create index ACTIVEMQ_MSGS_PIDX
        on ACTIVEMQ_MSGS (PRIORITY);
    create index ACTIVEMQ_MSGS_XIDX
        on ACTIVEMQ_MSGS (XID);
    

    坑:

    JDBC 改进: 加入高速缓存机制 Journal

    高速缓存在 activemq.xml 中的配置:

    ⑤代码运行验证

    一定要开启持久化 :  messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

    (1)队列Queue:

    生产者:

          。。。。。
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
            activeMQConnectionFactory.setBrokerURL(ACTIVEMQ_URL);
            Connection connection = activeMQConnectionFactory.createConnection();
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
            MessageProducer messageProducer = session.createProducer(queue);
            messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
            connection.start();
            for (int i = 0; i < 3; i++) {
    
           。。。。。。。。。。。

    运行结果:

    在点对点类型中

    • 当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中
    • 当DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中。

     而且点对点类型中消息一旦被Consumer消费,就从数据中删除
     
    消费前的消息,会被存放到数据库

    上面的消息被消费后被MQ自动删除

    (2)主题Topic

            。。。。
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
            activeMQConnectionFactory.setBrokerURL(ACTIVEMQ_URL);
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.setClientID("我是生产者张三");
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic(ACTIVEMQ_TOPIC_NAME);
            MessageProducer messageProducer = session.createProducer(topic);
            messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
            connection.start();
            for (int i = 0; i < 3; i++) {
            。。。。。。

     在点对点类型中

    • 当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中
    • 当DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中。

     而且点对点类型中消息一旦被Consumer消费,就从数据中删除
     
    消费前的消息,会被存放到数据库

    上面的消息被消费后被MQ自动删除

    3、JDBC Message store with ActiveMQ Journal(重点)

    3.1 定义:

    3.2 说明

    这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库读库。
    ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。
     
    当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。
    举个例子:

    生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。如果消费者的速度很慢,这个时候journal文件可以使消息以批量方式写到DB。

    3.3 配置方式:

    原来的配置:

    <persistenceAdapter> 
            <jdbcPersistenceAdapter dataSource="#mysql-ds" /> 
    </persistenceAdapter>

    修改的结果:

    <persistenceFactory>        
             <journalPersistenceAdapterFactory 
                         journalLogFiles="5" 
                         journalLogFileSize="32768" 
                         useJournal="true" 
                         useQuickJournal="true" 
                         dataSource="#mysql-ds" 
                         dataDirectory="../activemq-data" /> 
    </persistenceFactory>

    以前是实时写入mysql,在使用了journal后,数据会被journal处理,如果在一定时间内journal处理(消费)完了,就不写入mysql,如果没消费完,就写入mysql,起到一个缓存的作用

    小总结:

    1. 持久化消息是指:
    • MQ 所在的服务器down 了消息也不会丢失

         2.持久化机制演化过程:

    • 从最初的AMQ Message Store 方案到 ActiveMQ V4版本推出的High performance journal (高性能事务)附件,并且同步推出了关系型数据库的存储方案, ActiveMQ 5.3 版本有推出了KahaDB 的支持,(也是5.4之后的默认持久化方案),后来ActiveMQ 从5.8开始支持LevelDB ,现在5.9 提供了 Zookeeper + LevelDB 的集群化方案。

         3. ActiveMQ 消息持久化机制有:

    AMQ 基于日志文件
    KahaDB 基于日志文件,5.4 之后的默认持久化
    JDBC 基于第三方数据库
    LevelDB 基于文件的本地数据库存储,从5.8 之后推出了LevelDB 性能高于 KahaDB
    ReplicatedLevelDB Store

    从5.8之后提供了基于LevelDB 和Zookeeper 的数据复制方式,用于Master-slave方式的首数据复制选方案

    但是无论使用哪种持久化方式,消息的存储逻辑都一样

  • 相关阅读:
    洛谷p1017 进制转换(2000noip提高组)
    Personal Training of RDC
    XVIII Open Cup named after E.V. Pankratiev. Grand Prix of Eurasia
    XVIII Open Cup named after E.V. Pankratiev. Grand Prix of Peterhof.
    Asia Hong Kong Regional Contest 2019
    XVIII Open Cup named after E.V. Pankratiev. Grand Prix of Siberia
    XVIII Open Cup named after E.V. Pankratiev. Ukrainian Grand Prix.
    XVIII Open Cup named after E.V. Pankratiev. GP of SPb
    卜题仓库
    2014 ACM-ICPC Vietnam National First Round
  • 原文地址:https://www.cnblogs.com/wushaopei/p/12288728.html
Copyright © 2011-2022 走看看