zoukankan      html  css  js  c++  java
  • ActiveMQ持久化机制和JMS可靠消息

    1、ActiveMQ持久化机制

    1.1 JDBC将数据持久化到数据库

    1.2 AMQ生成日志文件

    1.3 KahaDB:本次磁盘生成数据文件(默认)

    1.4 LevelDB:谷歌K/V数据库

    1.5 默认不持久化,正常的开发模式当中,是需要持久化的  

      DeliveryMode.NON_PERSISTENT = 1 不开启持久化
      DeliveryMode.PERSISTENT = 2 开启持久化
      

    1.6 使用JDBC持久化  

    步骤一:创建一个数据库

        

    步骤二:配置activemq.xml配置文件 

    1.在persistenceAdapter加入如下配置
        <!--createTablesOnStartup 启动是否创建表 第一次为true 后续为false-->
        <jdbcPersistenceAdapter dataSource="#activemq-db" createTablesOnStartup="true" />
      

      2.配置数据源

      

      3.将数据库连接Jar放到activemq解压的lib文件夹下

      

    步骤三:重新启动ActiveMQ

      

      数据表解释:

      activemq_acks:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存。

        主要的数据库字段如下:

          CONTAINER:消息的Destination

          SUB_DEST:如果是使用Static集群,这个字段会有集群其他系统的信息

          CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用于区分

          SUB_NAME:订阅者名称

          SELECTOR:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作

          LAST_ACKED_ID:记录最后消费过的消息的ID

      activemq_lock:在集群环境下才有用,只有一个Broker可以获得消息,称为Master Broker

      activemq_msgs:用于存储消息,Queue和Topic都存储在这个表中;

        主要的数据库字段如下:

          CONTAINER:消息的Destination

          MSGID_PROD:消息发送者客户端的主键

          MSG_SEQ:是发送消息的顺序,MSGID+PROD+MSG_SEQ可以组成JMS的MessageID

          EXPIRATION:消息的过期时间

          MSG:消息本体的java序列化对象的二进制数据

          PRIORITY:优先级,从0-9,数值越大优先级越高

    步骤四:启动提供者后数据表activemq_msgs便增加一条数据

      

       

    步骤五:消费者启动后数据表消息被删除

      由于消费者已经接受消息,队列中数据删除了,所以数据库中的数据也被删除了

      

    2、JMS可靠消息

    2.1 带事务的Session

      ①生产者必须在生产完数据后手动提交session

      

    package com.zn.p2p;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * 点对点提供者
     */
    public class P2P_Provider {
        public static void main(String[] args) throws JMSException {
            //步骤一:创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory();
            //步骤二:创建连接
            Connection connection = activeMQConnectionFactory.createConnection();
            //步骤三:启动连接
            connection.start();
            //步骤四:获取会话工厂
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //步骤五:创建队列
            Queue quque = session.createQueue("MyQueue");
            //创建消息生产者
            MessageProducer producer = session.createProducer(quque);
            //消息持久化  参数1为不做持久化  2为持久化
            producer.setDeliveryMode(2);
            //模拟消息
            TextMessage textMessage = session.createTextMessage("Hello ActiveMQ!");
            //发送消息
            producer.send(textMessage);
            //手动提交消息
            session.commit();
            System.out.println("生产者生成消息完毕!");
            //回收资源
            session.close();
            connection.close();
        }
    }

      ②消费者在消费完数据之后也必须手动提交Session

      

    package com.zn.p2p;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * P2P消费者
     */
    public class P2P_Consumer {
        public static void main(String[] args) throws JMSException {
            //步骤一:创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            //步骤二:创建连接
            Connection connection = activeMQConnectionFactory.createConnection();
            //步骤三:启动连接
            connection.start();
            //步骤四:获取会话工厂
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //步骤五:创建队列
            Queue quque = session.createQueue("MyQueue");
            //创建消费者
            MessageConsumer consumer = session.createConsumer(quque);
            //循环获取消息
            while (true){
                TextMessage message = (TextMessage)consumer.receive();
                session.commit();
                if (message!=null){
                    System.out.println("消费者获取信息:"+message.getText());
                }else {
                    break;
                }
            }
            //回收资源
            session.close();
            connection.close();
        }
    }

    2.2 不带事务的Session:

      ①自动签收(不靠谱)

      

      ②手动签收

      

      消费者必须显示调用手动签收的方法进行签收,否则队列当中还是存在数据

      

  • 相关阅读:
    背水一战 Windows 10 (26)
    背水一战 Windows 10 (25)
    背水一战 Windows 10 (24)
    背水一战 Windows 10 (23)
    背水一战 Windows 10 (22)
    背水一战 Windows 10 (21)
    背水一战 Windows 10 (20)
    背水一战 Windows 10 (19)
    背水一战 Windows 10 (18)
    背水一战 Windows 10 (17)
  • 原文地址:https://www.cnblogs.com/Zzzzn/p/12307408.html
Copyright © 2011-2022 走看看