zoukankan      html  css  js  c++  java
  • ActiveMQ持久化消息(转)

    ActiveMQ的另一个问题就是只要是软件就有可能挂掉,挂掉不可怕,怕的是挂掉之后把信息给丢了,所以本节分析一下几种持久化方式:

    一、持久化为文件

    ActiveMQ默认就支持这种方式,只要在发消息时设置消息为持久化就可以了。

    打开安装目录下的配置文件:

    D:ActiveMQapache-activemqconfactivemq.xml在越80行会发现默认的配置项:

            <persistenceAdapter>

                <kahaDB directory="${activemq.data}/kahadb"/>

            </persistenceAdapter>

    注意这里使用的是kahaDB,是一个基于文件支持事务的消息存储器,是一个可靠,高性能,可扩展的消息存储器。

         他的设计初衷就是使用简单并尽可能的快。KahaDB的索引使用一个transaction log,并且所有的destination只使用一个index,有人测试表明:如果用于生产环境,支持1万个active connection,每个connection有一个独立的queue。该表现已经足矣应付大部分的需求。

    然后再发送消息的时候改变第二个参数为:

    MsgDeliveryMode.Persistent

    Message保存方式有2种
    PERSISTENT:保存到磁盘,consumer消费之后,message被删除。
    NON_PERSISTENT:保存到内存,消费之后message被清除。
    注意:堆积的消息太多可能导致内存溢出。

    然后打开生产者端发送一个消息:

    wps30F4.tmp

    不启动消费者端,同时在管理界面查看:

    wps3105.tmp

    发现有一个消息正在等待,这时如果没有持久化,ActiveMQ宕机后重启这个消息就是丢失,而我们现在修改为文件持久化,重启ActiveMQ后消费者仍然能够收到这个消息。

    wps3106.tmp

    二、持久化为数据库

    我们从支持Mysql为例,先从http://dev.mysql.com/downloads/connector/j/下载mysql-connector-java-5.1.34-bin.jar包放到:

    D:ActiveMQapache-activemqlib目录下。

    打开并修改配置文件:

    <beans
      xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
      http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
    
        <!-- Allows us to use system properties as variables in this configuration file -->
        <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
            <property name="locations">
                <value>file:${activemq.conf}/credentials.properties</value>
            </property>
        </bean>
    
       <!-- Allows accessing the server log -->
        <bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
              lazy-init="false" scope="singleton"
              init-method="start" destroy-method="stop">
        </bean>
    
        <!--
            The <broker> element is used to configure the ActiveMQ broker.
        -->
        <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
    
            <destinationPolicy>
                <policyMap>
                  <policyEntries>
                    <policyEntry topic=">" >
                        <!-- The constantPendingMessageLimitStrategy is used to prevent
                             slow topic consumers to block producers and affect other consumers
                             by limiting the number of messages that are retained
                             For more information, see:
    
                             http://activemq.apache.org/slow-consumer-handling.html
    
                        -->
                      <pendingMessageLimitStrategy>
                        <constantPendingMessageLimitStrategy limit="1000"/>
                      </pendingMessageLimitStrategy>
                    </policyEntry>
                  </policyEntries>
                </policyMap>
            </destinationPolicy>
    
    
            <!--
                The managementContext is used to configure how ActiveMQ is exposed in
                JMX. By default, ActiveMQ uses the MBean server that is started by
                the JVM. For more information, see:
    
                http://activemq.apache.org/jmx.html
            -->
            <managementContext>
                <managementContext createConnector="false"/>
            </managementContext>
    
            <!--
                Configure message persistence for the broker. The default persistence
                mechanism is the KahaDB store (identified by the kahaDB tag).
                For more information, see:
    
                http://activemq.apache.org/persistence.html
                <kahaDB directory="${activemq.data}/kahadb"/>
            -->
            <persistenceAdapter>
                <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/>
            </persistenceAdapter>
    
    
              <!--
                The systemUsage controls the maximum amount of space the broker will
                use before disabling caching and/or slowing down producers. For more information, see:
                http://activemq.apache.org/producer-flow-control.html
              -->
              <systemUsage>
                <systemUsage>
                    <memoryUsage>
                        <memoryUsage percentOfJvmHeap="70" />
                    </memoryUsage>
                    <storeUsage>
                        <storeUsage limit="100 gb"/>
                    </storeUsage>
                    <tempUsage>
                        <tempUsage limit="50 gb"/>
                    </tempUsage>
                </systemUsage>
            </systemUsage>
    
            <!--
                The transport connectors expose ActiveMQ over a given protocol to
                clients and other brokers. For more information, see:
    
                http://activemq.apache.org/configuring-transports.html
            -->
            <transportConnectors>
                <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
                <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            </transportConnectors>
    
            <!-- destroy the spring context on shutdown to stop jetty -->
            <shutdownHooks>
                <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
            </shutdownHooks>
    
        </broker>
      <bean id="derby-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
        <property name="username" value="root"/>
        <property name="password" value=""/>
        <property name="maxActive" value="200"/>
        <property name="poolPreparedStatements" value="true"/>
      </bean>
        <!--
            Enable web consoles, REST and Ajax APIs and demos
            The web consoles requires by default login, you can disable this in the jetty.xml file
    
            Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
        -->
        <import resource="jetty.xml"/>
    
    </beans>
    <!-- END SNIPPET: example -->

    重启ActiveMQ打开phpmyadmin发现多了3张表:

    wps3107.tmp

    然后启动生产者(不启动消费者)

    在Mysql中可以找到这条消息:

    wps3108.tmp

    关掉ActiveMQ并重启,模拟宕机。

    然后启动消费者:

    wps3118.tmp

    然后发现Mysql中已经没有这条消息了。

  • 相关阅读:
    Sql Server 2008卸载后再次安装一直报错
    listbox 报错 Cannot have multiple items selected when the SelectionMode is Single.
    Sql Server 2008修改Sa密码
    学习正则表达式
    Sql Server 查询第30条数据到第40条记录数
    Sql Server 复制表
    Sql 常见面试题
    Sql Server 简单查询 异步服务器更新语句
    jQuery stop()用法以及案例展示
    CSS3打造不断旋转的CD封面
  • 原文地址:https://www.cnblogs.com/HappyEDay/p/6069211.html
Copyright © 2011-2022 走看看