消息存储持久化:
ActiveMQ不仅支持persistent和non-persistent两种方式,还支持消息的恢复( recovery )方式
PTP: Queue的存储是很简单的,就是-一个FIFO的Queue
PUB/SUB: 对于持久化订阅主题,每一个消费者将获得一个消息的复制。
-
有效的消息存储:
ActiveMQ提供了-一个插件式的消息存储,类似于消息的多点传播,主要实现了如下几种:
1: AMQ消息存储-基于文件的存储方式,是以前的默认消息存储
2: KahaDB消息存储-提供了容量的提升和恢复能力,是现在的默认存储方式
3:JDBC消息存储-消息基于JDBC存储的
4:Memory消息存储-基于内存的消息存储- KahaDB Message Store概述:
KahaDB是目前默认的存储方式,可用于任何场景,提高了性能和恢复能力。消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。
KahaDB是一个专门针对消息持久化的解快方案,它对典型的消息使用模式进行了优化。在Kaha中,数据被追加到datalogs中。当不再需要log文件中的数据的时候,log文件会被丢弃。
KahaDB基本配置例子:
默认的位置在activemq/data/kahadb
db-x.log files: 以db-递增数字.log命名。
archive directory: 当配置支持archiving(默认不支持)并且存在,该文件夹才会创建。用于存储不再需要的data logs。
db.data: 存储btree索引
db.redo: 用于hard-stop broker后,btree索引的重建
可用属性:
名称 | 含义 |
---|---|
director | KahaDB存放的路径,默认值activemq-data |
indexWri teBatchSize | 批量写入磁盘的索弓lpage数量, 默认值1000 |
indexCacheSize | 内存中缓存索引page的数量,默认值10000 |
enableIndexWri teAsync | 是否异步写出索引,默认false |
journalMaxFileLength | 设置每个消息data log的大小,默认是32MB |
enableJournalDiskSyncs | 设置是否保证每个没有事务的内容,被同步写入磁盘,JMS持久化的时候需要,默认为true |
cleanupInterval | 在检查到不再使用的消息后,在具体删除消息前的时间,默认30000 |
checkpo intInterval | checkpoint 的间隔时间,默认5000 |
ignoreMi ssingJournalfiles | 是否忽略丢失的消息日志文件,默认false |
checkForCorruptJournalFiles | 在启动的时候,将会验证消息文件是否损坏,默认false |
checksumJournalFiles | 是否为每个消息日志文件提供checksum,默认false |
archiveDa taLogs | 是否移动文件到特定的路径,而不是删除它们,默认false |
directoryArchive | 定义消息已经被消费过后,移动data log到的路径,默认null |
databaseLockedWai tDelay | 获得数据库锁的等待时间(used by shared master/slave), 默认10000 |
maxAsyncJobs | 设置最大的可以存储的异步消息队列,默认值10000,可以和concurrentMe ssageProducers设置成一样的值 |
concurrentS toreAndDi spa tchTransactions | 是否分发消息到客户端,同时事务存储消息,默认true |
concurrentS toreAndDi spatchTopics | 是否分发Topic消息到客户端,同时进行存储,默认true |
concurrentS toreAndDi spa tchQueues | 是否分发queue消息到客户端,同时进行存储,默认true |
-
使用JDBC持久化消息:
默认表名:activemq,会自动建表
配置:
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#activemq-mysql" />
</persistenceAdapter>
<!-- 这个需要放在broker标签结束的地方 -->
<bean id="activemq-mysql" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.cj.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://192.168.0.102:3306/activemq?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
需要给activemq/lib增加Mysql的jar
- JDBC Message Store with ActiveMQ Jonrnal:
这种方式克服了JDBC Store的不足,使用快速的缓存写入技术,大大提高了性能。配置示例如下:
<persistenceFactory>
<journalPersistenceAdapterFactory
journalLogFiles="4"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#activemq-mysql"
dataDirectory="activemq-data" />
</persistenceFactory>
使用这个,其实是相当于,发送的消息先写入到内存中,日志文件中。然后在写入到数据库中。
JDBC Store和JDBC Message Store with ActiveMQ Journal 的区别:
1: Jdbc wi th journal的性能优于jdbc
2: Jdbc用于master/slave模式的数据库分享
3: Jdbc with journal不能用于master/slave模式
4: 一般情况下,推荐使用jdbc with journal
- Memory Message Store:
内存消息存储主要是存储所有的持久化的消息在内存中。这里没有动态的缓存存在,需要注意设置broker的jvm和内存限制,配置如下: