zoukankan      html  css  js  c++  java
  • 三,activemq持久化消息到mysql数据库中

    1,将activemq的服务端的的配置放到myeclipse新建的程序中,如下图

    2, 默认情况下Activemq使用KahaDB存储,注解掉KahaDB配置,改为mysql配置

      找到activemq.xml修改如下  

       <persistenceAdapter>
             <jdbcPersistenceAdapter dataDirectory="${activemq.data}" dataSource="#mysql-ds">
             </jdbcPersistenceAdapter>
           </persistenceAdapter>  

        该配置表示,我们将要使用名称为“mysql-ds”bean的id作为mysql数据源

      

    3. 配置MySql数据源

          在</broker>节点后面,增加MySQL数据源配置:

           <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
         <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
         <property name="url" value="jdbc:mysql://127.0.0.1/activemq?relaxAutoCommit=true"/>
         <property name="username" value="admin"/>
         <property name="password" value="admin"/>
         <property name="maxActive" value="200"/>
         <property name="poolPreparedStatements" value="true"/>
       </bean>

     (注意该配置和spring的配置类似 id应该与持久化的dataSource保持一致)

    4,添加mysql的驱动包到lib目录中

    5,整体配置如下:

    <!-- START SNIPPET: example -->
    <beans
      xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
      http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

        <!-- Allows us to use system properties as variables in this configuration file -->
        <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
            <property name="locations">
                <value>file:${activemq.conf}/credentials.properties</value>
            </property>
        </bean>

       <!-- Allows accessing the server log -->
        <bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
              lazy-init="false" scope="singleton"
              init-method="start" destroy-method="stop">
        </bean>

        <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" persistent="true">

            <destinationPolicy>
                <policyMap>
                  <policyEntries>
                    <policyEntry topic=">">
                      <pendingMessageLimitStrategy>
                        <constantPendingMessageLimitStrategy limit="1000"/>
                      </pendingMessageLimitStrategy>
                    </policyEntry>
                  </policyEntries>
                </policyMap>
            </destinationPolicy>

            <managementContext>
                <managementContext createConnector="false"/>
            </managementContext>

    <!--         <persistenceAdapter> -->
    <!--             <kahaDB directory="${activemq.data}/kahadb"/> -->
    <!--         </persistenceAdapter> -->

          <persistenceAdapter>
             <jdbcPersistenceAdapter dataDirectory="${activemq.data}" dataSource="#mysql-ds">
             </jdbcPersistenceAdapter>
          </persistenceAdapter>
                <systemUsage>
                <systemUsage>
                    <memoryUsage>
                        <memoryUsage percentOfJvmHeap="70" />
                    </memoryUsage>
                    <storeUsage>
                        <storeUsage limit="100 gb"/>
                    </storeUsage>
                    <tempUsage>
                        <tempUsage limit="50 gb"/>
                    </tempUsage>
                </systemUsage>
            </systemUsage>

            <transportConnectors>
                <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
                <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            </transportConnectors>

            <!-- destroy the spring context on shutdown to stop jetty -->
            <shutdownHooks>
                <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
            </shutdownHooks>
        </broker>
       <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
         <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
         <property name="url" value="jdbc:mysql://127.0.0.1/activemq?relaxAutoCommit=true"/>
         <property name="username" value="admin"/>
         <property name="password" value="admin"/>
         <property name="maxActive" value="200"/>
         <property name="poolPreparedStatements" value="true"/>
       </bean>
        <import resource="jetty.xml"/>

    </beans>
    <!-- END SNIPPET: example -->

    6,启动activemq

    在Run configrations中点击Java application反键新建在main class中输入org.apache.activemq.console.Main这个类,这是activemq的启动类

    配置arguments,里面输入start,最后运行即可

    7,启动成功后,我们会在数据库中看到这activemq_acks  ,activemq_lock  ,activemq_msgs(持久化消息表)三张表

    点击链接加入群【java研究所】:http://jq.qq.com/?_wv=1027&k=eC5Q0O
  • 相关阅读:
    【MySQL笔记】数据定义语言DDL
    【MySQL笔记】SQL语言四大类语言
    《ggplot2:数据分析与图形艺术》,读书笔记
    【数据处理】为什么数据要取对数
    【R实践】时间序列分析之ARIMA模型预测___R篇
    【R笔记】使用R语言进行异常检测
    【R笔记】日期处理
    朴素贝叶斯分类器的应用
    数据分析的方法与技术
    爬虫 测试webmagic (一)
  • 原文地址:https://www.cnblogs.com/rosydawn/p/4533001.html
Copyright © 2011-2022 走看看