zoukankan      html  css  js  c++  java
  • ActiveMQ测试代码和持久化总结

    ActiveMQ的安装控制台访问这里就不介绍了,直接上代码

    一、消息(queue)的测试代码如下

    生产者  JMSProduce

    package com.hanwl.activemq.queue;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * @author hanwl
     * @date 2020/12/13-14:48
     */
    public class JMSProduce {
        //默认连接用户名
        private static final String USERNAME = "admin";//ActiveMQConnection.DEFAULT_USER;
        //默认连接密码
        private static final String PASSWORD = "admin";//ActiveMQConnection.DEFAULT_PASSWORD;
        //默认连接地址,默认端口为61616
        private static final String BROKERURL = "tcp://localhost:61616";//ActiveMQConnection.DEFAULT_BROKER_URL;
    
        private static final String QUEUE_NAME = "queue_01";
    
        public static void main(String[] args) throws JMSException {
    
            //1.创建连接工厂 ,给定url,采用默认用户名密码
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(BROKERURL);
    
            //2.通过连接工厂获取连接connection并启动
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
    
            //3.创建回话session 两个参数 第一个叫事务,第二个叫签收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            //4.创建目的地 是队列还是主题topic
            Queue queue = session.createQueue(QUEUE_NAME);
    
            //5.创建消息的生产者
            MessageProducer messageProducer = session.createProducer(queue);
    
            //设置持久化:NON_PERSISTENT宕机消息丢失、PERSISTENT宕机消息保存,队列默认是PERSISTENT
            messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
    
            //6.通过messageProducer发生3条消息
            for(int i=1;i<=3;i++){
    
                //7.创建消息
                TextMessage textMessage = session.createTextMessage("msg" + i);
                //textMessage.setStringProperty("c01","vip"); //消息属性
                //8.发送消息
                messageProducer.send(textMessage);
    
            }
    
            //9.关闭资源
            messageProducer.close();
            session.close();
            connection.close();
            System.out.println("****消息发送完成");
    
        }
    }

    消费者 JMSConsumer

    package com.hanwl.activemq.queue;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    import java.io.IOException;
    
    /**
     * @author hanwl
     * @date 2020/12/13-16:14
     */
    public class JMSConsumer {
        //默认连接用户名
        private static final String USERNAME = "admin";//ActiveMQConnection.DEFAULT_USER;
        //默认连接密码
        private static final String PASSWORD = "admin";//ActiveMQConnection.DEFAULT_PASSWORD;
        //默认连接地址,默认端口为61616
        private static final String BROKERURL = "tcp://localhost:61616";//ActiveMQConnection.DEFAULT_BROKER_URL;
    
        private static final String QUEUE_NAME = "queue_01";
    
        public static void main(String[] args) throws JMSException, IOException {
            //1.创建连接工厂 ,给定url,采用默认用户名密码
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(BROKERURL);
    
            //2.通过连接工厂获取连接connection并启动
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
    
            //3.创建回话session 两个参数 第一个叫事务,第二个叫签收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            //4.创建目的地 是队列还是主题topic
            Queue queue = session.createQueue(QUEUE_NAME);
    
            //5.创建消息的消费者
            MessageConsumer messageConsumer = session.createConsumer(queue);
    
    
            //方式一 同步阻塞方式(receive)
            /*while (true){
                TextMessage textMessage = (TextMessage) messageConsumer.receive(10000);
                if(textMessage!=null){
                    System.out.println("****收到的消息是:"+textMessage.getText());
                }else{
                    break;
                }
            }*/
    
    
            //方式二 通过监听的方式消费消息
            //异步非阻塞方式(监听onMessage),通过setMessageListener注册一个监听器,当消息到达后系统自动调用监听器的onMessage方法
            messageConsumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if(message!=null && message instanceof TextMessage){
                        TextMessage textMessage = (TextMessage) message;
                        try {
                            System.out.println("****收到的消息是:"+textMessage.getText());
                            //System.out.println("****收到的消息属性是:"+textMessage.getStringProperty("c01"));
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            System.in.read();//等待消费完在关闭
    
    
            //9.关闭资源
            messageConsumer.close();
            session.close();
            connection.close();
            System.out.println("****消息接收完成");
    
            //问题:先开两个消费者监听器,发再生产6条消息后,2个消费者如何分消息?
            //答案:一人一半
        }
    }

    二、主题(topic)的测试代码如下:

    生产者 JmsProduce_Topic

    package com.hanwl.activemq.topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * @author hanwl
     * @date 2020/12/13-17:34
     */
    public class JmsProduce_Topic {
        //默认连接用户名
        private static final String USERNAME = "admin";//ActiveMQConnection.DEFAULT_USER;
        //默认连接密码
        private static final String PASSWORD = "admin";//ActiveMQConnection.DEFAULT_PASSWORD;
        //默认连接地址,默认端口为61616
        private static final String BROKERURL = "tcp://localhost:61616";//ActiveMQConnection.DEFAULT_BROKER_URL;
    
        private static final String TOPIC_NAME = "topic_01";
    
        public static void main(String[] args) throws JMSException {
    
            //1.创建连接工厂 ,给定url,采用默认用户名密码
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(BROKERURL);
    
            //2.通过连接工厂获取连接connection并启动
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
    
            //3.创建回话session 两个参数 第一个叫事务,第二个叫签收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            //4.创建目的地 是队列还是主题topic
            Topic topic = session.createTopic(TOPIC_NAME);
    
            //5.创建消息的生产者
            MessageProducer messageProducer = session.createProducer(topic);
    
            //6.通过messageProducer发生3条消息
            for(int i=1;i<=3;i++){
                //7.创建消息
                TextMessage textMessage = session.createTextMessage("msg" + i);
                //8.发生消息
                messageProducer.send(textMessage);
            }
    
            //9.关闭资源
            messageProducer.close();
            session.close();
            connection.close();
            System.out.println("****TOPIC_NAME消息发送到MQ完成");
    
        }
    }

    消费者 JmsConsumer_Topic

    package com.hanwl.activemq.topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    import java.io.IOException;
    
    /**
     * @author hanwl
     * @date 2020/12/13-17:44
     */
    public class JmsConsumer_Topic {
        //默认连接用户名
        private static final String USERNAME = "admin";//ActiveMQConnection.DEFAULT_USER;
        //默认连接密码
        private static final String PASSWORD = "admin";//ActiveMQConnection.DEFAULT_PASSWORD;
        //默认连接地址,默认端口为61616
        private static final String BROKERURL = "tcp://localhost:61616";//ActiveMQConnection.DEFAULT_BROKER_URL;
    
        private static final String TOPIC_NAME = "topic_01";
    
        public static void main(String[] args) throws JMSException, IOException {
    
            System.out.println("我是1号消费者:");
            //1.创建连接工厂 ,给定url,采用默认用户名密码
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(BROKERURL);
    
            //2.通过连接工厂获取连接connection并启动
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
    
            //3.创建回话session 两个参数 第一个叫事务,第二个叫签收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            //4.创建目的地 是队列还是主题topic
            Topic topic = session.createTopic(TOPIC_NAME);
    
            //5.创建消息的消费者
            MessageConsumer messageConsumer = session.createConsumer(topic);
    
    
            //方式一 同步阻塞方式(receive)
            /*while (true){
                TextMessage textMessage = (TextMessage) messageConsumer.receive(10000);
                if(textMessage!=null){
                    System.out.println("****收到的消息是:"+textMessage.getText());
                }else{
                    break;
                }
            }*/
    
            //方式二 通过监听的方式消费消息
            //异步非阻塞方式(监听onMessage),通过setMessageListener注册一个监听器,
            //当消息到达后系统自动调用监听器的onMessage方法
            messageConsumer.setMessageListener((message)->{
                if(message!=null && message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("****收到的消息是:"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            System.in.read();//等待消费完在关闭
    
            //9.关闭资源
            messageConsumer.close();
            session.close();
            connection.close();
            System.out.println("****消息接收完成");
    
            //先启动订阅,后启动生产
        }
    }

    三、和spring整合

    1.maven中引入相关jar包 pom.xml配置如下

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.hanwl.activemq</groupId>
        <artifactId>activemq_demo</artifactId>
        <version>1.0-SNAPSHOT</version>
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
        <dependencies>
            <!--mq需要的jar包 -->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.15.9</version>
            </dependency>
            <dependency>
                <groupId>org.apache.xbean</groupId>
                <artifactId>xbean-spring</artifactId>
                <version>3.16</version>
            </dependency>
            
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.10.5</version>
            </dependency>
    
    
            <!--spring整合需要的包-->
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>
                <version>4.3.28.RELEASE</version>
            </dependency>
            <!--activemq需要的pool包-->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-pool</artifactId>
                <version>5.15.9</version>
            </dependency>
            <!--spring相关jar包-->
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-core</artifactId>
                <version>4.3.28.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context</artifactId>
                <version>4.3.28.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jdbc</artifactId>
                <version>4.3.28.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-beans</artifactId>
                <version>4.3.28.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-tx</artifactId>
                <version>4.3.28.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-aop</artifactId>
                <version>4.3.28.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-orm</artifactId>
                <version>4.3.28.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-web</artifactId>
                <version>4.3.28.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-webmvc</artifactId>
                <version>4.3.28.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-test</artifactId>
                <version>4.3.28.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>com.mchange</groupId>
                <artifactId>c3p0</artifactId>
                <version>0.9.2</version>
            </dependency>
            <dependency>
                <groupId>cglib</groupId>
                <artifactId>cglib</artifactId>
                <version>2.2</version>
            </dependency>
            <dependency>
                <groupId>org.aspectj</groupId>
                <artifactId>aspectjweaver</artifactId>
                <version>1.6.8</version>
            </dependency>
    
    
    
            <!--junit/log4j基础通用配置-->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.30</version>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
                <version>1.2.3</version>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.16.18</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
            </dependency>
        </dependencies>
    
    </project>

    2.spring配置文件applicationContext.xml配置

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
    
        <!--开启包的自动扫描-->
        <context:component-scan base-package="com.hanwl.activemq" />
    
        <!--配置生产者-->
        <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
            <property name="connectionFactory">
                <!--真正可以产生connection的connectionFactory 由对应的JMS服务长商提供-->
                <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                    <property name="brokerURL" value="tcp://localhost:61616"/>
                </bean>
            </property>
            <property name="maxConnections" value="100"/>
        </bean>
    
    
        <!--队列的目的地 点对点-->
        <bean id="activeMQQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg index="0" value="spring-active-queue"/>
        </bean>
    
        <!--主题 发布订阅-->
        <bean id="activeMQTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg index="0" value="spring-active-topic"/>
        </bean>
    
        <!--spring提供的JMS工具类,它可以进行消息的发送和接收-->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="jmsFactory" />
            <!--<property name="defaultDestination" ref="activeMQQueue"/>-->
            <property name="defaultDestination" ref="activeMQTopic"/>
            <property name="messageConverter">
                <bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />
            </property>
        </bean>
    
        <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="jmsFactory" />
            <property name="destination" ref="activeMQTopic"/>
            <!--实现 MessageListener 的class-->
            <property name="messageListener" ref="myMessageListener"/>
        </bean>
    
    </beans>

    3.消费者和生产者以及配置文件中的 myMessageListener

    package com.hanwl.activemq.spring;
    import org.springframework.stereotype.Component;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    /**
     * @author hanwl
     * @date 2020/12/14-21:50
     */
    @Component
    public class MyMessageListener implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
            if(message!=null && message instanceof TextMessage){
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("****收到的消息是:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    
    
    
    package com.hanwl.activemq.spring;
    
    import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationContext;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.stereotype.Service;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    /**
     * @author hanwl
     * @date 2020/12/13-23:09
     */
    
    @Service
    public class SpringMQ_Produce {
    
        @Autowired
        private JmsTemplate jmsTemplate;
    
        public static void main(String[] args) {
            ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml");
            SpringMQ_Produce produce = (SpringMQ_Produce) ctx.getBean("springMQ_Produce");
            /*produce.jmsTemplate.send(new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    TextMessage textMessage = session.createTextMessage("spring和activemq整合的queue");
                    return textMessage;
                }
            });*/
            produce.jmsTemplate.send((session)->{
                TextMessage textMessage = session.createTextMessage("spring和activemq整合的queue for topic2");
                return textMessage;
            });
    
            System.out.println("**********send task over");
        }
    }
    
    
    
    
    package com.hanwl.activemq.spring;
    
    import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationContext;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.stereotype.Service;
    
    import javax.jms.TextMessage;
    
    /**
     * @author hanwl
     * @date 2020/12/13-23:08
     */
    @Service
    public class SpringMQ_Consumer {
        @Autowired
        private JmsTemplate jmsTemplate;
    
        public static void main(String[] args) {
            ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml");
            SpringMQ_Consumer consumer = (SpringMQ_Consumer) ctx.getBean("springMQ_Consumer");
    
            String strMessage = (String) consumer.jmsTemplate.receiveAndConvert();
    
            System.out.println("********消费者收到的消息:"+strMessage);
    
        }
    }

    四、与springboot的整合这里就不展开了,就是配置,也不难。

     

    五、持久化相关总结

    消息的可靠性包含三个方面:持久化、事务、签收。activeMQ中对于投递模式设置为持久化的消息,broker接收到到消息之后,会先把消息存储到存储介质,然后再转发到消息的监听者,activeMQ提供以下几种消息持久化策略

        1.kahaDB 默认持久化的方式(通过文件共享的方式实现集群)
        2.JDBC存储
        3.Memory内在
        4.LevelDb  性能高于kahaDB
        5.JDBC with ActiveMQ journal

    至于这些存储方式是在哪个版本开始出现的,可以查看官网。我们在工作中常用的是kahaDB和JDBC以及JDBC  journal。

    使用时候一般需要开启持久化:messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

    KahaDB存储
    KahaDB是默认的持久化策略,所有消息顺序添加到一个日志文件中,同时另外有一个索引文件记录指向这些日志的存储地址,还有一个事务日志用于消息回复操作。是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。

    在data/kahadb这个目录下,会生成四个文件,来完成消息持久化
    1.db.data 它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-*.log里面存储的消息
    2.db.redo 用来进行消息恢复
    3. db-*.log 存储消息内容。新的数据以APPEND的方式追加到日志文件末尾。属于顺序写入,因此消息存储是比较 快的。默认是32M,达到阀值会自动递增
    4.lock文件 锁,写入当前获得kahadb读写权限的broker ,用于在集群环境下的竞争处理

    那么如何配置kahadb的持久化策略呢?在conf/activemq.xml文件里面添加以下配置,不过这个是默认就在里面的配置。

            <persistenceAdapter>
                <kahaDB directory="${activemq.data}/kahadb"/>
            </persistenceAdapter>

    JDBC存储
    使用JDBC持久化方式,数据库默认会创建3个表,每个表的作用如下:
    activemq_msgs:queue和topic的消息都存在这个表中
    activemq_acks:存储持久订阅的信息和最后一个持久订阅接收的消息ID
    activemq_lock:跟kahadb的lock文件类似,确保数据库在某一时刻只有一个broker在访问

    那么如何进行配置呢?
    首先需要定义一个数据库连接源,我们使用mysql,定义如下:

       <bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> 
            <property name="driverClassName" value="com.mysql.jdbc.Driver"/> 
            <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/> 
            <property name="username" value="root"/> 
            <property name="password" value="root"/> 
            <property name="poolPreparedStatements" value="true"/> 
       </bean> 
    
    放到activemq的 confactivemq.xml 里面,beans中就行。
    默认使用的是dbcp数据源,所以只需要拷贝一个mysql驱动mysql-connector-java-5.1.37-bin到activemq的 lib 下就行,如果使用的是c3p0或者druid,
    需要同时拷贝它们的相关jar包

    然后使用这个数据源配置持久化策略

    <persistenceAdapter>
         <jdbcPersistenceAdapter dataSource="#mysql-ds" />
     </persistenceAdapter>

    这里使用的dataSource就是上面配置的数据源

    配置完成之后重启activemq,可以看到自动创建了三个表:

    activemq_msgs:queue和topic的消息都存在这个表中
    activemq_acks:存储持久订阅的信息和最后一个持久订阅接收的消息ID
    activemq_lock:跟kahadb的lock文件类似,确保数据库在某一时刻只有一个broker在访问

    JDBC Message store with ActiveMQ Journal
    这种方式克服了JDBC Store的不足,JDBC存储每次消息过来,都需要去写库和读库。 ActiveMQ Journal,使用延迟存储数据到数据库,当消息来到时先缓存到文件中,延迟后才写到数据库中。

    当消费者的消费速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。 举个例子,生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况 下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上的消息,那么这个时候只需要同步剩余的 10%的消息到DB。 如果消费者的消费速度很慢,这个时候journal文件可以使消息以批量方式写到DB。
    配置方式,先把原来的jdbc持久化配置去掉,加入以下配置

            <persistenceFactory>
                 <journalPersistenceAdapterFactory
                    journalLogFiles="5"
                    journalLogFileSize="32768"
                    useJournal="true"
                    useQuickJournal="true"
                    dataDirectory="activemq-data"
                    dataSource="#mysql-ds"/>
             </persistenceFactory>

    JDBC存储和JDBC with ActiveMQ journal的区别:
           1. JDBC with journal的性能优于jdbc
      2. JDBC用于master/slave模式的数据库分享
      3. JDBC with journal不能用于master/slave模式
      4. 一般情况下,推荐使用jdbc with journal 

    这里说一句,jdbc方式往数据库写消息如果发如下错误 

    java.lang.llegalStateException:BeanFactory not initialized or already closed ,可能是操作系统机器名中有 “_” 符号,改机器名重启可解决。

    最后说一下配置broker的nio方式,也是activemq.xml中配置

    <transportConnectors>
                <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
                <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <!--<transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true"/>
                 <transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>-->
            </transportConnectors>

    其中注释的部分就nio 还auto+nio方式,在代码中的地址就需要写成

    private static final String BROKERURL = "nio://localhost:61618

    到这里就总结完activemq的使用和持久化了,后续有更新再补充。

  • 相关阅读:
    wince 操作sqlite数据库
    c#数据结构(第二章)
    C#数据结构(第三章)
    近期学习(收藏地址)
    c#数据结构(第四章)
    PowerDesigner 数据库设计
    wince操作远程sqlserver数据库
    一个有趣的算法
    c#数据结构(第一章)
    c#实现显示图片的动态效果
  • 原文地址:https://www.cnblogs.com/loong-hon/p/14147481.html
Copyright © 2011-2022 走看看