ActiveMQ 9. ActiveMQ的消息存储和持久化
9.1. 是什么
持久化就是高可用的机制,即使服务器宕机了,消息也不会丢失的机制。
为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制。
ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。
就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等。再试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试尝试发送。
消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有,则会先把存储位置中的消息发出去。
9.2. 有哪些
9.2.1. AMQ Mesage Store(了解)
基于文件的存储方式,是以前的默认消息存储,现在不用了
AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储在一个个文件中,文件的默认大小为32M,当一个文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本
9.2.2. KahaDB消息存储(默认)
9.2.2.1. KahaDB官网相关
基于日志文件,从ActiveMQ5.4开始默认的持久化插件
配置文件activemq.xml中的配置为:
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
9.2.2.2. 说明
KahaDB是目前默认的存储方式,可用于任何场景,提高了性能和恢复能力。
消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。
KahaDB是是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。
数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。
9.2.2.3. 存储原理
-
db-
.log 存储数据,一个存满会再次创建 db-2、db-3 …… ,当不会有引用到数据文件的内容时,文件会被删除或归档 -
db.data 是一个BTree 索引,索引了消息数据记录的消息,是消息索引文件,它作为索引指向了 db-
.log 里的消息 类似mysql 数据库,新建一张表,就有这个表对应的 .MYD 文件,作为它的数据文件,就有一个 .MYI 作为索引文件。
-
db.free 当前db.data文件里哪些页面是空闲的,文件具体内容是所有空闲页的ID
-
db.redo 当 KahaDB 消息存储在强制退出后启动,用于恢复 BTree 索引
-
lock 顾名思义就是锁
四类文件+一把锁 ==》 KahaDB
9.2.3. LevelDB消息存储(了解)
这种文件系统是从ActiveMQ5.8之后引进的,它和KahaDB非常相似,也是基于文件的本地数据库存储形式,但是它提供比KahaDB更快的持久性。
但它不使用自定义B-Tree实现来索引独写日志,而是使用基于LevelDB的索引
题外话:为什么LeavelDB 更快,并且5.8 以后就支持,为什么还是默认 KahaDB 引擎,因为 activemq 官网本身没有定论,LeavelDB 之后又出了可复制的LeavelDB 比LeavelDB 更性能更优越,但需要基于 ZooKeeper 所以这些官方还没有定论,仍旧使用 KahaDB。
默认配置如下:
<persistenceAdapter>
<levelDB directory="activemq-data"/>
</persistenceAdapter>
9.2.4. JDBC消息存储
-
添加mysql数据库的驱动包到lib文件夹
mysql-connector-java-xxx.jar
-
修改配置文件activemq.xml
2.1. 修改
标签 修改前:
<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>
修改后:
<persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds" /> </persistenceAdapter>
dataSource指定将要引用的持久化数据库的bean名称,
createTablesOnStartup是否在启动时创建数据表,默认值是true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改为false。
2.2. 增加
标签 标签在 和 之间
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://192.168.42.117:3306/activemq?relaxAutoCommit=true" />
<property name="username" value="root" />
<property name="password" value="1234" />
<property name="maxTotal" value="200" />
<property name="poolPreparedStatements" value="true" />
</bean>
-
数据库建仓建表
建一个名为activemq的数据库
ActiveMQ 启动后会自动在 mysql 的activemq 数据库下创建三张表:activemq_msgs 、activemq_acks、activemq_lock
-
activemq_acks:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存
-
activemq_lock:在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker
-
activemq_msgs:用于存储消息,Queue和Topic都存储在这个表中
-
如果新建数据库OK+上述配置OK+代码运行OK,3张表会自动生成。通过data/activemq.log
可以查看启动日志。
万一情况,需要手动建表,SQL如下:
CREATE TABLE `activemq_acks` (
`CONTAINER` varchar(250) NOT NULL,
`SUB_DEST` varchar(250) DEFAULT NULL,
`CLIENT_ID` varchar(250) NOT NULL,
`SUB_NAME` varchar(250) NOT NULL,
`SELECTOR` varchar(250) DEFAULT NULL,
`LAST_ACKED_ID` bigint(20) DEFAULT NULL,
`PRIORITY` bigint(20) NOT NULL DEFAULT '5',
`XID` varchar(250) DEFAULT NULL,
PRIMARY KEY (`CONTAINER`,`CLIENT_ID`,`SUB_NAME`,`PRIORITY`),
KEY `ACTIVEMQ_ACKS_XIDX` (`XID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `activemq_lock` (
`ID` bigint(20) NOT NULL,
`TIME` bigint(20) DEFAULT NULL,
`BROKER_NAME` varchar(250) DEFAULT NULL,
PRIMARY KEY (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `activemq_msgs` (
`ID` bigint(20) NOT NULL,
`CONTAINER` varchar(250) NOT NULL,
`MSGID_PROD` varchar(250) DEFAULT NULL,
`MSGID_SEQ` bigint(20) DEFAULT NULL,
`EXPIRATION` bigint(20) DEFAULT NULL,
`MSG` longblob,
`PRIORITY` bigint(20) DEFAULT NULL,
`XID` varchar(250) DEFAULT NULL,
PRIMARY KEY (`ID`),
KEY `ACTIVEMQ_MSGS_MIDX` (`MSGID_PROD`,`MSGID_SEQ`),
KEY `ACTIVEMQ_MSGS_CIDX` (`CONTAINER`),
KEY `ACTIVEMQ_MSGS_EIDX` (`EXPIRATION`),
KEY `ACTIVEMQ_MSGS_PIDX` (`PRIORITY`),
KEY `ACTIVEMQ_MSGS_XIDX` (`XID`),
KEY `ACTIVEMQ_MSGS_IIDX` (`ID`,`XID`,`CONTAINER`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-
代码验证并查看数据库情况
生产者代码中设置
// 设置持久化 messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
点对点类型中,当DeliveryMode设置为NON_PERSISTENT时,消息被保存在内存中;当设置为PERSISTENT时,消息保存在broker的相应的文件或者数据库中。
点对点会在数据库的数据表
ACTIVEMQ_MSGS
表中加入消息的数据,且在点对点时,消息被消费就会从数据库中删除。测试主题时,要先运行消费者,后运行生产者。
但是对于主题,订阅方式接受到的消息,会在ACTIVEMQ_MSGS表中存储消息,即使MQ 服务器下线,并在 ACTIVEMQ_ACKS 中存储消费者信息 。 并且存储以 activemq 为主,当activemq 中的消息被删除后,数据库中的也会自动被删除。
-
小总结
-
queue
在没有消费者消费的情况下会将消息保存在activemq_msgs表中,只要有任意一个消费者已经消费过了,消费之后这些消息将会立即被删除。
-
topic
一般是先启动消费订阅然后再生产的情况下会将消息保存到activemq_acks。
-
-
开发有坑
-
数据库jar包
需要将相关jar放入ActiveMQ安装路径下的lib目录。mysql驱动jar和数据库连接池jar。
-
createTablesOnStartup属性
启动完成,自动创建好表后可以修改为false。
-
注意下划线
报错"java.lang.IllegalStateException: BeanFactory not initialized or already closed"
这是因为操作系统的机器名中有"_",更改机器名重启后即可。
-
9.2.5. JDBC Message store with ActiveMQ Journal
这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库读库。
ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。
当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。
举个例子:
生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。
如果消费者的速度很慢,这个时候journal文件可以使消息以批量方式写到DB。
配置方式
修改配置文件:
修改前:
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" />
</persistenceAdapter>
修改后:
<persistenceFactory>
<journalPersistenceAdapterFactory
journalLogFiles="5"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#mysql-ds"
dataDirectory="activemq-data" />
</persistenceFactory>
按照视频教程实现失败,未解决,错误信息如下:
2020-01-30 21:15:48,506 | WARN | Could not get JDBC connection: Cannot create PoolableConnectionFactory (Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.) | org.apache.activemq.store.jdbc.JDBCPersistenceAdapter | main
java.sql.SQLException: Cannot create PoolableConnectionFactory (Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.)
at org.apache.commons.dbcp2.BasicDataSource.createPoolableConnectionFactory(BasicDataSource.java:669)[commons-dbcp2-2.7.0.jar:2.7.0]
at org.apache.commons.dbcp2.BasicDataSource.createDataSource(BasicDataSource.java:544)[commons-dbcp2-2.7.0.jar:2.7.0]
at org.apache.commons.dbcp2.BasicDataSource.getConnection(BasicDataSource.java:753)[commons-dbcp2-2.7.0.jar:2.7.0]
at org.apache.activemq.store.jdbc.TransactionContext.lockAndWrapped(TransactionContext.java:70)[activemq-jdbc-store-5.15.11.jar:5.15.11]
at org.apache.activemq.store.jdbc.TransactionContext.getConnection(TransactionContext.java:63)[activemq-jdbc-store-5.15.11.jar:5.15.11]
at org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.loadAdapter(JDBCPersistenceAdapter.java:454)[activemq-jdbc-store-5.15.11.jar:5.15.11]
at org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.createAdapter(JDBCPersistenceAdapter.java:437)[activemq-jdbc-store-5.15.11.jar:5.15.11]
at org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.getAdapter(JDBCPersistenceAdapter.java:386)[activemq-jdbc-store-5.15.11.jar:5.15.11]
at org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.init(JDBCPersistenceAdapter.java:299)[activemq-jdbc-store-5.15.11.jar:5.15.11]
at org.apache.activemq.broker.LockableServiceSupport.preStart(LockableServiceSupport.java:89)[activemq-broker-5.15.11.jar:5.15.11]
at org.apache.activemq.util.ServiceSupport.start(ServiceSupport.java:54)[activemq-client-5.15.11.jar:5.15.11]
at org.apache.activemq.store.journal.JournalPersistenceAdapter.start(JournalPersistenceAdapter.java:289)[activemq-jdbc-store-5.15.11.jar:5.15.11]
at org.apache.activemq.broker.BrokerService.doStartPersistenceAdapter(BrokerService.java:687)[activemq-broker-5.15.11.jar:5.15.11]
at org.apache.activemq.broker.BrokerService.startPersistenceAdapter(BrokerService.java:671)[activemq-broker-5.15.11.jar:5.15.11]
at org.apache.activemq.broker.BrokerService.start(BrokerService.java:635)[activemq-broker-5.15.11.jar:5.15.11]
at org.apache.activemq.xbean.XBeanBrokerService.afterPropertiesSet(XBeanBrokerService.java:73)[activemq-spring-5.15.11.jar:5.15.11]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.8.0_161]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)[:1.8.0_161]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)[:1.8.0_161]
at java.lang.reflect.Method.invoke(Method.java:498)[:1.8.0_161]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeCustomInitMethod(AbstractAutowireCapableBeanFactory.java:1759)[spring-beans-4.3.24.RELEASE.jar:4.3.24.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1696)[spring-beans-4.3.24.RELEASE.jar:4.3.24.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1626)[spring-beans-4.3.24.RELEASE.jar:4.3.24.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:553)[spring-beans-4.3.24.RELEASE.jar:4.3.24.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:481)[spring-beans-4.3.24.RELEASE.jar:4.3.24.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:312)[spring-beans-4.3.24.RELEASE.jar:4.3.24.RELEASE]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)[spring-beans-4.3.24.RELEASE.jar:4.3.24.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:308)[spring-beans-4.3.24.RELEASE.jar:4.3.24.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)[spring-beans-4.3.24.RELEASE.jar:4.3.24.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:756)[spring-beans-4.3.24.RELEASE.jar:4.3.24.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867)[spring-context-4.3.24.RELEASE.jar:4.3.24.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:542)[spring-context-4.3.24.RELEASE.jar:4.3.24.RELEASE]
at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64)[xbean-spring-4.14.jar:4.14]
at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52)[xbean-spring-4.14.jar:4.14]
at org.apache.activemq.xbean.XBeanBrokerFactory$1.<init>(XBeanBrokerFactory.java:104)[activemq-spring-5.15.11.jar:5.15.11]
at org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:104)[activemq-spring-5.15.11.jar:5.15.11]
at org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:67)[activemq-spring-5.15.11.jar:5.15.11]
at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71)[activemq-broker-5.15.11.jar:5.15.11]
at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54)[activemq-broker-5.15.11.jar:5.15.11]
at org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:87)[activemq-console-5.15.11.jar:5.15.11]
at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63)[activemq-console-5.15.11.jar:5.15.11]
at org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:154)[activemq-console-5.15.11.jar:5.15.11]
at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63)[activemq-console-5.15.11.jar:5.15.11]
at org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:104)[activemq-console-5.15.11.jar:5.15.11]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.8.0_161]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)[:1.8.0_161]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)[:1.8.0_161]
at java.lang.reflect.Method.invoke(Method.java:498)[:1.8.0_161]
at org.apache.activemq.console.Main.runTaskClass(Main.java:262)[activemq.jar:5.15.11]
at org.apache.activemq.console.Main.main(Main.java:115)[activemq.jar:5.15.11]
9.2.6. 持久化机制小总结
-
持久化消息是指:
MQ 所在的服务器down 了消息也不会丢失
-
持久化机制演化过程:
从最初的AMQ Message Store 方案到 ActiveMQ V4版本推出的High performance journal (高性能事务)附件,并且同步推出了关系型数据库的存储方案, ActiveMQ 5.3 版本有推出了KahaDB 的支持,(也是5.4之后的默认持久化方案),后来ActiveMQ 从5.8开始支持LevelDB ,现在5.9 提供了 Zookeeper + LevelDB 的集群化方案。
- ActiveMQ 消息持久化机制有:
持久化机制 | 说明 |
---|---|
AMQ | 基于日志文件 |
KahaDB | 基于日志文件,5.4 之后的默认持久化 |
JDBC | 基于第三方数据库 |
LevelDB | 基于文件的本地数据库存储,从5.8 之后推出了LevelDB 性能高于 KahaDB |
ReplicatedLevelDB Store | 从5.8之后提供了基于LevelDB 和Zookeeper 的数据复制方式,用于Master-slave方式的首数据复制选方案 |
无论使用哪种持久化方式,消息的存储逻辑都是一致的。