zoukankan      html  css  js  c++  java
  • ActiveMQ持久化机制

    为什么要进行持久化

      为了避免MQ服务器意外宕机导致数据丢失,需要做到重启后没有被消费的数据依然在消息队列中。

    ActiveMQ的持久化机制包含:

      1. JDBC:持久化到数据库

      2. AMQ:日志文件

      3. KahaBD:AMQ基础上改进,默认选择

      4. LevelDB:谷歌K/V数据库

      注:ActiveMQ默认是不开启持久化的。

    ActiveMQ默认持久化机制

        <persistenceAdapter>
                <kahaDB directory="${activemq.data}/kahadb"/>
            </persistenceAdapter>

      性能:

        AMQ的性能改与JDBC的持久化机制,由于是在文件中追加写入消息,所以性能比较高。并且创建了消息主键索引和缓存索引机制以提升性能。

      缺点:

        AMQ会为每一个Destination创建一个索引,若创建了大小的消息队列,则磁盘占用会非常大,所以由于索引文件比较大,当Broker崩溃后,重建所以速度比较慢。  

    JDBC持久化机制

      步骤一:创建数据库

        

      步骤二:在程序中开启持久化操作

    producer.setDeliveryMode(DeliveryMode.PERSISTENT);

        PERSISTENT:代表开启持久化

        NON_PERSISTENT:代表不开启持久化

      步骤三:配置activemq.xml文件

        文件位置在conf目录下

        3.1 在persistenceAdapter加入如下配置

         <persistenceAdapter>
            <!--第一次创建是createTablesOnStartup属性为true,之后创建完成后将属性该为false,一直为true的话,每次启动的时候都会创建新表 --> <jdbcPersistenceAdapter dataSource="#activemq-db" createTablesOnStartup="true" /> </persistenceAdapter>

        3.2 配置数据源

      <bean id="activemq-db" class="org.apache.commons.dbcp.BasicDataSource"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://127.0.0.1:3306/activemq"/> <property name="username" value="root"/> <property name="password" value="123"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean>

        3.3 添加依赖包

          由于JDBC持久话方式需要连接数据库,所以需要在ActiveMQ安装目录的lib目录下添加如下jar包:

          

      步骤四:重启

        4.1重新启动activeMQ

          重启之后数据库中将会自动创建三张表

          

          activemq_acks:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存。

            主要的数据库字段如下:

              CONTAINER:消息的Destination

              SUB_DEST:如果是使用Static集群,这个字段会有集群其他系统的信息

              CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用于区分

              SUB_NAME:订阅者名称

              SELECTOR:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作

              LAST_ACKED_ID:记录最后消费过的消息的ID

          activemq_lock:在集群环境下才有用,只有一个Broker可以获得消息,称为Master Broker

          activemq_msgs:用于存储消息,Queue和Topic都存储在这个表中;

            主要的数据库字段如下:

              CONTAINER:消息的Destination

              MSGID_PROD:消息发送者客户端的主键

              MSG_SEQ:是发送消息的顺序,MSGID+PROD+MSG_SEQ可以组成JMS的MessageID

              EXPIRATION:消息的过期时间

              MSG:消息本体的java序列化对象的二进制数据

              PRIORITY:优先级,从0-9,数值越大优先级越高

        4.2 启动生产者和消费者,测试效果

          启动生产者:

            

            生产者发送完成,下面可以查看一下队列中的数据

            

            因为上面做配置将数据保存在数据库中,下面可以查看一下数据库中的数据

            

          关闭ActiveMQ服务测试数据有没有真正的持久化

            关闭之后页面时无法访问的;

            

            关闭之后刷新数据库,数据依然存在,这是数据已经持久化了

          启动ActiveMQ服务以及消费者:

            

             启动消费者时,数据已经成功接收到了

            

            因为消费者已经接受到消息了,所以队列中的待处理数据就变成了0,并且现在的消费者是1;

            下面可以查看一个数据库中的数据

            

            由于消费者已经接受消息,队列中数据删除了,所以数据库中的数据也被删除了

          结论:生产者就数据发送到队列中,如果不做持久化操作,ActiveMQ服务器宕机,数据就会清空;如果做了持久化操作,ActiveMQ服务器宕机之后再次启动,没有被消费者消费的消息就会一直在数据库中,等消费者消费完消息后,数据库中的数据才会被删除。

  • 相关阅读:
    ansible自动化运维04
    ansible自动化运维03
    ansible自动化运维02
    ansible自动化运维01
    mysql innodb存储引擎和一些参数优化
    Mysql 数据库常用配置命令
    zabbix server3.4 使用mailx配置邮件报警
    32_redis cluster的核心原理分析:gossip通信、jedis smart定位、主备切换
    31_redis cluster的自动化slave迁移实现更强的高可用架构的部署方案
    30_redis cluster通过master水平扩容来支撑更高的读写吞吐+海量数据
  • 原文地址:https://www.cnblogs.com/wnwn/p/12307290.html
Copyright © 2011-2022 走看看