zoukankan      html  css  js  c++  java
  • JMS消息可靠机制

    ActiveMQ消息签收机制:

    客戶端成功接收一条消息的标志是一条消息被签收,成功应答。

    消息的签收情形分两种:

    1、带事务的session

       如果session带有事务,并且事务成功提交,则消息被自动签收。如果事务回滚,则消息会被再次传送。

    2、不带事务的session

       不带事务的session的签收方式,取决于session的配置。

       Activemq支持一下三種模式:

       Session.AUTO_ACKNOWLEDGE  消息自动签收

       Session.CLIENT_ACKNOWLEDGE  客戶端调用acknowledge方法手动签收

    textMessage.acknowledge();//手动签收

       Session.DUPS_OK_ACKNOWLEDGE 不是必须签收,消息可能会重复发送。在第二次重新传送消息的时候,消息

    只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。 在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值:

    Number Of Consumers  消费者 这个是消费者端的消费者数量 

    Number Of Pending Messages 等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数 
    Messages Enqueued 进入队列的消息  进入队列的总数量,包括出队列的。 这个数量只增不减 
    Messages Dequeued 出了队列的消息  可以理解为是消费这消费掉的数量 

    默认JMS默认自动签收,消费者获取到消息之后,不管消费者对该消息处理业务逻辑是否成功,都会默认已经消费的。

    手动签收模式,消息中间件将消息推送给消费者,消费者接收到消息之后,必须手动发送命令告诉消息中间件已经消费成功

    当消息被消费了,消息中间件依然还存在着呢

     

    需要调用:

    textMessage.acknowledge();//手动签收

    手动签收

    生产者依然:  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);    //自动签收

    但是消费者 :Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);  //手动签收

      textMessage.acknowledge();  //告诉中间件 已经消费了

    完整代码

    package com.toov5.producer;
    
    import javax.jms.Connection;
    import javax.jms.DeliveryMode;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    
    public class producerTest {
        // mq通讯地址
        private static String url = "tcp://192.168.91.6:61616";
        // 队列名称
        private static String queueName = "toov5_queue";
    
        public static void main(String[] args) throws JMSException {
            // 先创建连接工厂 密码默认采用admin admin
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
            // 创建连接
            Connection connection = activeMQConnectionFactory.createConnection();
            // 启动连接
            connection.start();
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 是否需要事务方式提交 消费方式默认自动签收
            // 拿到session 创建目标 创建队列
            Queue queue = session.createQueue(queueName);
            // 创建生产者
            MessageProducer producer = session.createProducer(queue); // 生产者生产的消息 是放在这个queue里面的
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);  //默认非持久化的 设置持久化
            for (int i = 0; i < 10; i++) {
                // 拿到队列 创建消息
                TextMessage textMessage = session.createTextMessage("消息内容" + i);
                // 发送消息
                producer.send(textMessage);
            }
            // 关闭连接
            connection.close();
            System.out.println("消息发送完毕");
        }
    
    }
    package com.toov5.producer;
    
    import javax.jms.Connection;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    
    public class consumerTest {
        // mq通讯地址
        private static String url = "tcp://192.168.91.6:61616";
        // 队列名称
        private static String queueName = "toov5_queue";
    
        public static void main(String[] args) throws JMSException {
            // 先创建连接工厂 密码默认采用admin admin
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
            // 创建连接
            Connection connection = activeMQConnectionFactory.createConnection();
            // 启动连接
            connection.start();
            // 创建会话
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); // 是否需要事务方式提交 消费方式默认自动签收
            // 拿到session 创建目标 创建队列
            Queue queue = session.createQueue(queueName);
            
            
             
            //创建消费者
            MessageConsumer consumer = session.createConsumer(queue);
            //启动监听 监听消息
            consumer.setMessageListener(new MessageListener() {
                
                public void onMessage(Message message) {
                    //强制转换
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("consumer 消费 producer:"+textMessage.getText());
                        textMessage.acknowledge();  //告诉中间件 已经消费了
                    } catch (JMSException e) {
                        
                        e.printStackTrace();
                    }
                }
            });
             //监听时候 不要关闭连接 关闭就不监听了 一只处于监听状态 (长连接)
             
        }
    
    }

    以事务形式发送或者签收 (要不中间件是没有这些消息的)

    producer:

    package com.toov5.producer;
    
    import javax.jms.Connection;
    import javax.jms.DeliveryMode;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    
    public class producerTest {
        // mq通讯地址
        private static String url = "tcp://192.168.91.6:61616";
        // 队列名称
        private static String queueName = "toov5_queue";
    
        public static void main(String[] args) throws JMSException {
            // 先创建连接工厂 密码默认采用admin admin
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
            // 创建连接
            Connection connection = activeMQConnectionFactory.createConnection();
            // 启动连接
            connection.start();
            // 创建会话
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 以事务方式提交 消费方式默认自动签收
            // 拿到session 创建目标 创建队列
            Queue queue = session.createQueue(queueName);
            // 创建生产者
            MessageProducer producer = session.createProducer(queue); // 生产者生产的消息 是放在这个queue里面的
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);  //默认非持久化的 设置持久化
            for (int i = 0; i < 10; i++) {
                // 拿到队列 创建消息
                TextMessage textMessage = session.createTextMessage("消息内容" + i);
                // 发送消息
                producer.send(textMessage);
                session.commit(); //提交事务
            }
            // 关闭连接
            connection.close();
            System.out.println("消息发送完毕");
        }
    
    }

    consumer

      

    package com.toov5.producer;
    
    import javax.jms.Connection;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    
    public class consumerTest {
        // mq通讯地址
        private static String url = "tcp://192.168.91.6:61616";
        // 队列名称
        private static String queueName = "toov5_queue";
    
        public static void main(String[] args) throws JMSException {
            // 先创建连接工厂 密码默认采用admin admin
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
            // 创建连接
            Connection connection = activeMQConnectionFactory.createConnection();
            // 启动连接
            connection.start();
            // 创建会话
            final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 是否是以事务方式提交 消费方式默认自动签收 启动事务
            // 拿到session 创建目标 创建队列
            Queue queue = session.createQueue(queueName);
            
            
             
            //创建消费者
            MessageConsumer consumer = session.createConsumer(queue);
            //启动监听 监听消息
            consumer.setMessageListener(new MessageListener() {
                
                public void onMessage(Message message) {
                    //强制转换
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("consumer 消费 producer:"+textMessage.getText());
    //                    textMessage.acknowledge();
                        session.commit();   //提交事务
                    } catch (JMSException e) {
                        
                        e.printStackTrace();
                    }
                }
            });
             //监听时候 不要关闭连接 关闭就不监听了 一只处于监听状态 (长连接)
             
        }
    
    }

     以上所用到的pom依赖

        <dependencies>
    		<dependency>
    			<groupId>org.apache.activemq</groupId>
    			<artifactId>activemq-core</artifactId>
    			<version>5.7.0</version>
    		</dependency>
         </dependencies>
    

      

    如果生产者事务形式提交消息,消费者以事务形式接受消息

    消费者 第一次运行, 但是没有标记已消费

                第二次运行,如果生产者有先的消息继续发送,消费者接收每个消息都commit,标记为已消费。  (自己试玩玩把)

     手动签收比较推荐下哈~

  • 相关阅读:
    Nginx+keepalived高可用配置
    kubespahere安装kubenetes
    Fastdfs原理及集群搭建
    Spark 3.0.3集群安装文档
    Mybatis Plus 代码生成器
    redis集群方案
    Go 语言并发之道
    重构-改善即有代码的设计
    QT线程
    QT中的cout
  • 原文地址:https://www.cnblogs.com/toov5/p/9937165.html
Copyright © 2011-2022 走看看