zoukankan      html  css  js  c++  java
  • ActiveMQ使用JDBC持久化

        步骤一:创建一个数据库
                步骤二:配置activemq.xml配置文件
                    1.在persistenceAdapter加入如下配置
                     

    <!--createTablesOnStartup    启动是否创建表  第一次为true 后续为false-->
    <jdbcPersistenceAdapter dataSource="#activemq-db" createTablesOnStartup="true" />

          第一次为true是为了创建表,之后的每次都不创建,使用第一次创建的表保存数据


        
                    2.配置数据源
                        在beans节点中
                      

     <bean id="activemq-db" class="org.apache.commons.dbcp.BasicDataSource">
                          <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
                          <property name="url" value="jdbc:mysql://127.0.0.1:3306/activemq"/>
                          <property name="username" value="root"/>
                          <property name="password" value="root"/>
                          <property name="maxActive" value="200"/>
                          <property name="poolPreparedStatements" value="true"/>
                        </bean>


                        
                    3.将数据库连接Jar放到activemq解压的lib文件夹下


                    
                步骤三:重新启动activemq

        重启后刷新数据库,会生成三张表

    activemq_msgs用于存储消息,Queue和Topic都存储在这个表中:

    ID:自增的数据库主键

    CONTAINER:消息的Destination

    MSGID_PROD:消息发送者客户端的主键

    MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID

    EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数

    MSG:消息本体的Java序列化对象的二进制数据

    PRIORITY:优先级,从0-9,数值越大优先级越高

     

    activemq_acks用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存:

    主要的数据库字段如下:

    CONTAINER:消息的Destination

    SUB_DEST:如果是使用Static集群,这个字段会有集群其他系统的信息

    CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用以区分

    SUB_NAME:订阅者名称

    SELECTOR:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作

    LAST_ACKED_ID:记录消费过的消息的ID。

     

    表activemq_lock在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,

    其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。
    这个表用于记录哪个Broker是当前的Master Broker。

    启动生产者生产一条消息

    public static void main(String[] args) throws JMSException {
            //创建MQ连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
            //创建连接
            Connection connection = activeMQConnectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建会话工厂
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            //创建队列
            Queue queue = session.createQueue("wdksoft_queue");
            //创建消息生产者
            MessageProducer producer = session.createProducer(queue);
            //消息持久化
            producer.setDeliveryMode(2);
            //模拟消息
            TextMessage message = session.createTextMessage("hello activeMQ");
            //发送消息
            producer.send(message);
    
            System.out.println("生产者发送消息完毕~");
    
            session.close();
            connection.close();
        }

    刷新msgs数据表

     数据表中就多了一条刚才的信息

      当启动消费者消费了这条信息时,数据库便不再保存这条记录

    public static void main(String[] args) throws JMSException {
            // ConnectionFactory :连接工厂,JMS 用它创建连接
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
            // JMS 客户端到JMS Provider 的连接
            Connection connection = connectionFactory.createConnection();
            connection.start();
            // Session: 一个发送或接收消息的线程
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // Destination :消息的目的地;消息发送给谁.
            Destination destination = session.createQueue("wdksoft_queue");
            // 消费者,消息接收者
            MessageConsumer consumer = session.createConsumer(destination);
            while (true) {
                //监听消息
                TextMessage message = (TextMessage) consumer.receive();
                if (null != message) {
                    System.out.println("收到消息:" + message.getText());
                } else
                    break;
            }
            session.close();
            connection.close();
        }

  • 相关阅读:
    五、appium自动化之模拟点击、滑动等用户行为
    三、jMeter测试,jason Ectractor和正则表达式获取数组类型
    四、appium实现九宫格滑动和双指缩放操作--TouchAction/MultiAction
    三、定时器--强制等待、显式等待和隐式等待
    一、jMeter实现文件的上传和下载
    二、appium+python find_element定位元素的方法
    二、jMeter测试之输出测试结果到excel文件(jxl.jar)
    一、appium自动化测试--初始化设置
    fourSum
    letterCombinations
  • 原文地址:https://www.cnblogs.com/chx9832/p/12308250.html
Copyright © 2011-2022 走看看