zoukankan      html  css  js  c++  java
  • ActiveMQ持久化消息

    ActiveMQ的另一个问题就是只要是软件就有可能挂掉,挂掉不可怕,怕的是挂掉之后把信息给丢了,所以本节分析一下几种持久化方式:

    一、持久化为文件

    ActiveMQ默认就支持这种方式,只要在发消息时设置消息为持久化就可以了。

    打开安装目录下的配置文件:

    D:ActiveMQapache-activemqconfactivemq.xml在越80行会发现默认的配置项:

            <persistenceAdapter>

                <kahaDB directory="${activemq.data}/kahadb"/>

            </persistenceAdapter>

    注意这里使用的是kahaDB,是一个基于文件支持事务的消息存储器,是一个可靠,高性能,可扩展的消息存储器。

         他的设计初衷就是使用简单并尽可能的快。KahaDB的索引使用一个transaction log,并且所有的destination只使用一个index,有人测试表明:如果用于生产环境,支持1万个active connection,每个connection有一个独立的queue。该表现已经足矣应付大部分的需求。

    然后再发送消息的时候改变第二个参数为:

    MsgDeliveryMode.Persistent

    Message保存方式有2种
    PERSISTENT:保存到磁盘,consumer消费之后,message被删除。
    NON_PERSISTENT:保存到内存,消费之后message被清除。
    注意:堆积的消息太多可能导致内存溢出。

    然后打开生产者端发送一个消息:

    wps30F4.tmp

    不启动消费者端,同时在管理界面查看:

    wps3105.tmp

    发现有一个消息正在等待,这时如果没有持久化,ActiveMQ宕机后重启这个消息就是丢失,而我们现在修改为文件持久化,重启ActiveMQ后消费者仍然能够收到这个消息。

    wps3106.tmp

    二、持久化为数据库

    我们从支持Mysql为例,先从http://dev.mysql.com/downloads/connector/j/下载mysql-connector-java-5.1.34-bin.jar包放到:

    D:ActiveMQapache-activemqlib目录下。

    打开并修改配置文件:

    复制代码
    <beans
      xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
      http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
    
        <!-- Allows us to use system properties as variables in this configuration file -->
        <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
            <property name="locations">
                <value>file:${activemq.conf}/credentials.properties</value>
            </property>
        </bean>
    
       <!-- Allows accessing the server log -->
        <bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
              lazy-init="false" scope="singleton"
              init-method="start" destroy-method="stop">
        </bean>
    
        <!--
            The <broker> element is used to configure the ActiveMQ broker.
        -->
        <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
    
            <destinationPolicy>
                <policyMap>
                  <policyEntries>
                    <policyEntry topic=">" >
                        <!-- The constantPendingMessageLimitStrategy is used to prevent
                             slow topic consumers to block producers and affect other consumers
                             by limiting the number of messages that are retained
                             For more information, see:
    
                             http://activemq.apache.org/slow-consumer-handling.html
    
                        -->
                      <pendingMessageLimitStrategy>
                        <constantPendingMessageLimitStrategy limit="1000"/>
                      </pendingMessageLimitStrategy>
                    </policyEntry>
                  </policyEntries>
                </policyMap>
            </destinationPolicy>
    
    
            <!--
                The managementContext is used to configure how ActiveMQ is exposed in
                JMX. By default, ActiveMQ uses the MBean server that is started by
                the JVM. For more information, see:
    
                http://activemq.apache.org/jmx.html
            -->
            <managementContext>
                <managementContext createConnector="false"/>
            </managementContext>
    
            <!--
                Configure message persistence for the broker. The default persistence
                mechanism is the KahaDB store (identified by the kahaDB tag).
                For more information, see:
    
                http://activemq.apache.org/persistence.html
                <kahaDB directory="${activemq.data}/kahadb"/>
            -->
            <persistenceAdapter>
                <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/>
            </persistenceAdapter>
    
    
              <!--
                The systemUsage controls the maximum amount of space the broker will
                use before disabling caching and/or slowing down producers. For more information, see:
                http://activemq.apache.org/producer-flow-control.html
              -->
              <systemUsage>
                <systemUsage>
                    <memoryUsage>
                        <memoryUsage percentOfJvmHeap="70" />
                    </memoryUsage>
                    <storeUsage>
                        <storeUsage limit="100 gb"/>
                    </storeUsage>
                    <tempUsage>
                        <tempUsage limit="50 gb"/>
                    </tempUsage>
                </systemUsage>
            </systemUsage>
    
            <!--
                The transport connectors expose ActiveMQ over a given protocol to
                clients and other brokers. For more information, see:
    
                http://activemq.apache.org/configuring-transports.html
            -->
            <transportConnectors>
                <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
                <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            </transportConnectors>
    
            <!-- destroy the spring context on shutdown to stop jetty -->
            <shutdownHooks>
                <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
            </shutdownHooks>
    
        </broker>
      <bean id="derby-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
        <property name="username" value="root"/>
        <property name="password" value=""/>
        <property name="maxActive" value="200"/>
        <property name="poolPreparedStatements" value="true"/>
      </bean>
        <!--
            Enable web consoles, REST and Ajax APIs and demos
            The web consoles requires by default login, you can disable this in the jetty.xml file
    
            Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
        -->
        <import resource="jetty.xml"/>
    
    </beans>
    <!-- END SNIPPET: example -->
    复制代码

    重启ActiveMQ打开phpmyadmin发现多了3张表:

    wps3107.tmp

    然后启动生产者(不启动消费者)

    在Mysql中可以找到这条消息:

    wps3108.tmp

    关掉ActiveMQ并重启,模拟宕机。

    然后启动消费者:

    wps3118.tmp

    然后发现Mysql中已经没有这条消息了。

  • 相关阅读:
    [Unity] 2D开发学习教程
    [Unity] 查找资源
    [Unity] UGUI研究院之游戏摇杆
    [Unity] Unity3D研究院编辑器之自定义默认资源的Inspector面板
    [Unity] Unity3D研究院编辑器之独立Inspector属性
    [Unity] 精灵动画制作中需要注意的一些问题
    [Unity] 常用技巧收集
    IDEA相关设置
    Hive配置文件hive-site.xml
    MySql通用二进制版本在Linux(Ubuntu)下安装与开启服务
  • 原文地址:https://www.cnblogs.com/Jeely/p/10784841.html
Copyright © 2011-2022 走看看