zoukankan      html  css  js  c++  java
  • ActiveMQ 消息存储持久化

    ActiveMQ中对于投递模式设置为持久化的消息,broker接收到到消息之后,会先把消息存储到存储介质,然后再转发到消息的监听者
    ActiveMQ持久化方式:AMQ、KahaDB、JDBC、LevelDB
    持久化配置路径:ActiveMQapache-activemqconfactivemq.xml

    官方文档: http://activemq.apache.org/persistence.html

    Message保存方式

    1. PERSISTENT:保存到磁盘,consumer消费之后,message被删除。
    2. NON_PERSISTENT:保存到内存,消费之后message被清除。

    kahaDB 持久化为日志文件(默认)

    所有消息顺序添加到一个日志文件中,同时另外有一个索引文件记录指向这些日志的存储地址,还有一个事务日志用于消息恢复操作。

    特点:基于文件的本地数据库储存形式,是一个支持事务,可靠,高性能,可扩展的消息存储器。

    data/kahadb这个目录下,会生成四个文件,来完成消息持久化

    1. db.data它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-*.log里面存储的消息。
      一旦这个消息不在被需要,数据文件可以被删除或归档。
    2. db.redo用来进行消息恢复
    3. db-*.log存储消息内容。新的数据以APPEND的方式追加到日志文件末尾。属于顺序写入,因此消息存储是比较快的。默认是32M,达到阀值会自动递增
    4. lock文件锁,写入当前获得kahadb读写权限的broker ,用于在集群环境下的竞争处理
    <persistenceAdapter>
        <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="32mb"/>
    </persistenceAdapter>
    

    JDBC 基于第三方数据库

    使用JDBC持久化方式,数据库默认会创建3个表,每个表的作用如下:

    1. activemq_msgs:queue和topic的消息都存在这个表中
    2. activemq_acks:存储持久订阅的信息和最后一个持久订阅接收的消息ID
    3. activemq_lock:跟kahadb的lock文件类似,确保数据库在某一时刻只有一个broker在访问

    步骤:

    1. 将用来保存数据的数据库的连接jar包放到ActiveMQlib下,如:mysql就是commons-dbcp-1.4``mysql-connector-java-8.0.9``commons-pool-1.6
    2. conf下的ActiveMQ.xml文件中persistenceAdapter标签下的<kahaDB directory="${activemq.data}/kahadb"/>
      <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/>代替
      createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改成false
    3. 在conf下的ActiveMQ.xml文件中在</broker>后面``<import/>前面添加相关数据库的DataSource``bean
    4. 创建相关数据库ld:对应数据库的url配置
    5. 启动ActiveMQ,如果ld下自动建立了三张表,证明持久化成功
    <beans>
        <broker>
            <persistenceAdapter>
                <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" />
            </persistenceAdapter>
        </broker>
         
        <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">         
            <property name="driverClassName" value="com.mysql.jdbc.Driver"/>       
            <property name="url" value="jdbc:mysql://localhost/ld?relaxAutoCommit=true"/> 
            <property name="username" value="root"/>
            <property name="password" value="aaa"/>
            <property name="maxActive" value="200"/>   
            <property name="poolPreparedStatements" value="true"/> 
        </bean>
    </beans>
    

    JDBC Message Store with ActiveMQ Journal(高速缓存)

    这种方式克服了JDBC Store的不足,JDBC存储每次消息过来,都需要去写库和读库ActiveMQ Journal,使用延迟存储数据到数据库,当消息来到时先缓存到文件中,延迟后才写到数据库中

    如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上的消息,那么这个时候只需要同步剩余的 10%的消息到DB。

    配置方式,先把原来的jdbc持久化配置去掉,加入以下配置

        <broker>
            <persistenceFactory>
                <journalPersistenceAdapterFactory
                journalLogFiles="4"
                journalLogFileSize="32768"
                useJournal="true"
                useQuickJournal="true"
                dataSource="#mysql-ds"
                dataDirectory="activemq-data" />
            </persistenceFactory>
        </broker>
    
  • 相关阅读:
    C# 读取sqlite文件
    MongoDB聚合管道
    提取Word里的文本内容 C#
    Two Sum【LeetCode】
    Could not create SSL/TLS secure channel.
    处理Task引发的异常
    https请求抛出异常
    hexo+github page +markdown问题汇总
    通过自定义比较器排序(C#版)
    GridView固定行宽,自动换行,鼠标放在Table的Tr上变色
  • 原文地址:https://www.cnblogs.com/loveer/p/11407959.html
Copyright © 2011-2022 走看看