zoukankan      html  css  js  c++  java
  • ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

    个人博客网:https://wushaopei.github.io/    (你想要这里多有)

    一、持久化机制

    1、Activemq持久化

    1.1 什么是持久化:

    持久化就是高可用的机制,即使服务器宕机了,消息也不会丢失

    1.2 持久化的作用

    将MQ 收到的消息存储到文件、硬盘、数据库 等、 则叫MQ 的持久化,这样即使服务器宕机,消息在本地还是有,仍就可以访问到。

    详情——官网 : http://activemq.apache.org/persistence

    1.3 ActiveMQ 支持的消息持久化机制

    • 为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一半都会采用持久化机制。
    • ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。
    • 就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等。再试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试尝试发送。
    • 消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有,则会先把存储位置中的消息发出去。

    注意:一句话:ActiveMQ宕机了,消息不会丢失的机制。

    二、持久化方式

    1、Activemq的持久化方式有几种

    1.1 AMQ Mesage Store(了解)

    AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储再一个个文件中文件的默认大小为32M,当一个文件中的消息已经
    全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本

    注意:基于文件的存储方式,是以前的默认消息存储,现在不用了

    1.2 KahaDB消息存储(默认)

     5.4 之后基于日志文件的持久化插件,默认持久化插件,提高了性能和恢复能力

    KahaDB 的属性配置 : http://activemq.apache.org/kahadb

    说明:它使用一个事务日志和 索引文件来存储所有的地址

    db-<数字>.log 存储数据,一个存满会再次创建 db-2 db-3 …… ,当不会有引用到数据文件的内容时,文件会被删除或归档
    
    db.data 是一个BTree 索引,索引了消息数据记录的消息,是消息索引文件,它作为索引指向了 db-<x>.log 里的消息
    
    一点题外话:就像mysql 数据库,新建一张表,就有这个表对应的 .MYD 文件,作为它的数据文件,就有一个 .MYI 作为索引文件。
    
    db.free 存储空闲页 ID 有时会被清除
    
    db.redo 当 KahaDB 消息存储在强制退出后启动,用于恢复 BTree 索引
    
    lock 顾名思义就是锁
    
    四类文件+一把锁 ==》 KahaDB

    1.3 JDBC消息存储

    消息基于JDBC存储的

    1.4 LevelDB消息存储(了解)

    希望作为以后的存储引擎,5.8 以后引进,也是基于文件的本地数据存储形式,但是比 KahaDB 更快

    它比KahaDB 更快的原因是她不使用BTree 索引,而是使用本身自带的 LeavelDB 索引

    这种文件系统是从ActiveMQ5.8之后引进的,它和KahaDB非常相似,也是基于文件的本地数据库存储形式,但是它提供比KahaDB更快的持久性。
    但它不使用自定义B-Tree实现来索引独写日志,而是使用基于LevelDB的索引

    题外话:为什么LeavelDB 更快,并且5.8 以后就支持,为什么还是默认 KahaDB 引擎,因为 activemq 官网本身没有定论,LeavelDB 之后又出了可复制的LeavelDB 比LeavelDB 更性能更优越,但需要基于 Zookeeper 所以这些官方还没有定论,任就使用 KahaDB

    默认配置如下:

    <persistenceAdapter>
          <levelDB directory="activemq-data"/>
    </persistenceAdapter>

    1.5 JDBC Message Store with ActiveMQ Journal

    2、JDBC存储消息(重点)

    JDBC : 有一部分数据会真实的存储到数据库中 
           使用JDBC 的持久化,

    修改配置文件,默认 kahaDB

    修改之前:

    <persistenceAdapter>
    
           <kahaDB directory="${activemq.data}/kahadb"/>  
    
     </persistenceAdapter>

    修改之后:

    <persistenceAdapter>
    
          <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
    
     </persistenceAdapter>

    在activemq 的lib 目录下添加 jdbc 的jar 包 (connector.jar 我使用5.1.41 版本)

    修改配置文件 : activemq.xml 使其连接自己windows 上的数据库,并在本地创建名为activemq 的数据库

    让linux 上activemq 可以访问到 mysql ,之后产生消息。

    ActiveMQ 启动后会自动在 mysql 的activemq 数据库下创建三张表:activemq_msgs 、activemq_acks、activemq_lock

    activemq_acks:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存
    
    activemq_lock:在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker
    
    activemq_msgs:用于存储消息,Queue和Topic都存储在这个表中

    点对点会在数据库的数据表 ACTIVEMQ_MSGS 中加入消息的数据,且在点对点时,消息被消费就会从数据库中删除

    但是对于主题,订阅方式接受到的消息,会在 ACTIVEMQ_MSGS 存储消息,即使MQ 服务器下线,并在 ACTIVEMQ_ACKS 中存储消费者信息 。 并且存储以 activemq 为主,当activemq 中的消息被删除后,数据库中的也会自动被删除。

    如果表没生成,可能需要自己创建

    -- auto-generated definition
    create table ACTIVEMQ_ACKS
    (
        CONTAINER     varchar(250)     not null comment '消息的Destination',
        SUB_DEST      varchar(250)     null comment '如果使用的是Static集群,这个字段会有集群其他系统的信息',
        CLIENT_ID     varchar(250)     not null comment '每个订阅者都必须有一个唯一的客户端ID用以区分',
        SUB_NAME      varchar(250)     not null comment '订阅者名称',
        SELECTOR      varchar(250)     null comment '选择器,可以选择只消费满足条件的消息,条件可以用自定义属性实现,可支持多属性AND和OR操作',
        LAST_ACKED_ID bigint           null comment '记录消费过消息的ID',
        PRIORITY      bigint default 5 not null comment '优先级,默认5',
        XID           varchar(250)     null,
        primary key (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)
    )
        comment '用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存';
    
    create index ACTIVEMQ_ACKS_XIDX
        on ACTIVEMQ_ACKS (XID);
    
     
    -- auto-generated definition
    create table ACTIVEMQ_LOCK
    (
        ID          bigint       not null
            primary key,
        TIME        bigint       null,
        BROKER_NAME varchar(250) null
    );
    
     
    -- auto-generated definition
    create table ACTIVEMQ_MSGS
    (
        ID         bigint       not null
            primary key,
        CONTAINER  varchar(250) not null,
        MSGID_PROD varchar(250) null,
        MSGID_SEQ  bigint       null,
        EXPIRATION bigint       null,
        MSG        blob         null,
        PRIORITY   bigint       null,
        XID        varchar(250) null
    );
    
    create index ACTIVEMQ_MSGS_CIDX
        on ACTIVEMQ_MSGS (CONTAINER);
    create index ACTIVEMQ_MSGS_EIDX
        on ACTIVEMQ_MSGS (EXPIRATION);
    create index ACTIVEMQ_MSGS_MIDX
        on ACTIVEMQ_MSGS (MSGID_PROD, MSGID_SEQ);
    create index ACTIVEMQ_MSGS_PIDX
        on ACTIVEMQ_MSGS (PRIORITY);
    create index ACTIVEMQ_MSGS_XIDX
        on ACTIVEMQ_MSGS (XID);
    

    坑:

    JDBC 改进: 加入高速缓存机制 Journal

    高速缓存在 activemq.xml 中的配置:

    ⑤代码运行验证

    一定要开启持久化 :  messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

    (1)队列Queue:

    生产者:

          。。。。。
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
            activeMQConnectionFactory.setBrokerURL(ACTIVEMQ_URL);
            Connection connection = activeMQConnectionFactory.createConnection();
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
            MessageProducer messageProducer = session.createProducer(queue);
            messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
            connection.start();
            for (int i = 0; i < 3; i++) {
    
           。。。。。。。。。。。

    运行结果:

    在点对点类型中

    • 当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中
    • 当DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中。

     而且点对点类型中消息一旦被Consumer消费,就从数据中删除
     
    消费前的消息,会被存放到数据库

    上面的消息被消费后被MQ自动删除

    (2)主题Topic

            。。。。
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
            activeMQConnectionFactory.setBrokerURL(ACTIVEMQ_URL);
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.setClientID("我是生产者张三");
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic(ACTIVEMQ_TOPIC_NAME);
            MessageProducer messageProducer = session.createProducer(topic);
            messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
            connection.start();
            for (int i = 0; i < 3; i++) {
            。。。。。。

     在点对点类型中

    • 当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中
    • 当DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中。

     而且点对点类型中消息一旦被Consumer消费,就从数据中删除
     
    消费前的消息,会被存放到数据库

    上面的消息被消费后被MQ自动删除

    3、JDBC Message store with ActiveMQ Journal(重点)

    3.1 定义:

    3.2 说明

    这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库读库。
    ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。
     
    当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。
    举个例子:

    生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。如果消费者的速度很慢,这个时候journal文件可以使消息以批量方式写到DB。

    3.3 配置方式:

    原来的配置:

    <persistenceAdapter> 
            <jdbcPersistenceAdapter dataSource="#mysql-ds" /> 
    </persistenceAdapter>

    修改的结果:

    <persistenceFactory>        
             <journalPersistenceAdapterFactory 
                         journalLogFiles="5" 
                         journalLogFileSize="32768" 
                         useJournal="true" 
                         useQuickJournal="true" 
                         dataSource="#mysql-ds" 
                         dataDirectory="../activemq-data" /> 
    </persistenceFactory>

    以前是实时写入mysql,在使用了journal后,数据会被journal处理,如果在一定时间内journal处理(消费)完了,就不写入mysql,如果没消费完,就写入mysql,起到一个缓存的作用

    小总结:

    1. 持久化消息是指:
    • MQ 所在的服务器down 了消息也不会丢失

         2.持久化机制演化过程:

    • 从最初的AMQ Message Store 方案到 ActiveMQ V4版本推出的High performance journal (高性能事务)附件,并且同步推出了关系型数据库的存储方案, ActiveMQ 5.3 版本有推出了KahaDB 的支持,(也是5.4之后的默认持久化方案),后来ActiveMQ 从5.8开始支持LevelDB ,现在5.9 提供了 Zookeeper + LevelDB 的集群化方案。

         3. ActiveMQ 消息持久化机制有:

    AMQ 基于日志文件
    KahaDB 基于日志文件,5.4 之后的默认持久化
    JDBC 基于第三方数据库
    LevelDB 基于文件的本地数据库存储,从5.8 之后推出了LevelDB 性能高于 KahaDB
    ReplicatedLevelDB Store

    从5.8之后提供了基于LevelDB 和Zookeeper 的数据复制方式,用于Master-slave方式的首数据复制选方案

    但是无论使用哪种持久化方式,消息的存储逻辑都一样

  • 相关阅读:
    html 入门 "地表最强"干货 你值得拥有
    python信号量
    死锁 与 递归锁
    互斥锁
    进程之间的通讯
    进程与多道技术
    进程对象常用属性
    开启子进程的方式2
    牛客多校赛2K Keyboard Free
    省选刷题小记 (06~10)
  • 原文地址:https://www.cnblogs.com/wushaopei/p/12288728.html
Copyright © 2011-2022 走看看