zoukankan      html  css  js  c++  java
  • ActiveMQ Topic消息重发

    MQ学习系列:

    1. 消息队列概念与认知
    2. ActiveMQ Topic消息重发
    3. ActiveMQ Topic 持久化订阅
    4. zookeeper+ActiveMQ集群实现高可用

    一、ActiveMQ Topic 消息重发

    准备工作

    windows下ActiveMQ的下载与启动

    启动错误以及解决方案

    activeMQ启动错误 BeanFactory not initialized

    JMS 消息确认机制

    在session接口中定义的几个常量:

    • AUTO_ACKNOWLEDGE = 1 自动确认
    • CLIENT_ACKNOWLEDGE = 2 客户端手动确认
    • DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
    • SESSION_TRANSACTED = 0 事务提交并确认

    代码实现

    消息消费端在创建Session对象时需要指定应答模式为客户端手动应答,当消费者获取到消息并成功处理后需要调用message.acknowledge()方法进行应答,通知Broker消费成功。如果处理过程中出现异常,需要调用session.recover()通知Broker重复消息,默认最多重复6次。

    1. 创建maven项目引入依赖
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-client -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.15.8</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    
    1. 编写测试方法模拟【无消息重发的正常情况】
    package org.newmean;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.junit.Test;
    
    import javax.jms.*;
    
    public class ActiveMQTest {
        //消息发送方-producter
        @Test
        public void test1() throws JMSException {
            //创建连接工厂对象
            ConnectionFactory connectionFactory = new 		ActiveMQConnectionFactory("tcp://localhost:61616");
            //从工厂中获取一个连接对象
            Connection connection = connectionFactory.createConnection();
            //连接MQ服务
            connection.start();
            //获取session对象
            //参数说明 b 是否使用事务 i jms消息确认机制 1 2 3 0 用常量表示
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //通过session创建Topic
            Topic topic = session.createTopic("TestTopic");
            //通过session创建消息发送者
            MessageProducer producer = session.createProducer(topic);
            //通过session创建消息对象
            TextMessage message = session.createTextMessage("hello");
            //发送消息
            producer.send(message);
            //关闭资源
            producer.close();
            session.close();
            connection.close();
        }
        //消息接收方-consumer
        @Test
        public void test2() throws JMSException {
            //创建连接工厂对象
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            //从工厂中获取一个连接对象
            Connection connection = connectionFactory.createConnection();
            //连接MQ服务
            connection.start();
            //获取session对象
            //参数说明 b 是否使用事务 i jms消息确认机制 1 2 3 0 用常量表示
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //通过session创建Topic
            Topic topic = session.createTopic("TestTopic");
            //通过session创建消费者
            MessageConsumer consumer = session.createConsumer(topic);
            //指定消息监听器
            consumer.setMessageListener(new MessageListener() {
                //当我们监听的topic中存在消息,onMessage这个方法就会自动运行
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("消费者接收到了消息:"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            //因为要接收消息不能关闭,同时线程不能死掉
            while (true){
    
            }
    
        }
    }
    
    

    先启动test2方法发起订阅“TestTopic”消息,然后启动test1方法,这时消费者收到了消息。

    1. 消息重发模拟

      我们只需要更消息接收方的代码,改动如下:

    //消息接收方-consumer
        @Test
        public void test2() throws JMSException {
            //创建连接工厂对象
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            //从工厂中获取一个连接对象
            Connection connection = connectionFactory.createConnection();
            //连接MQ服务
            connection.start();
            //获取session对象
            //参数说明 b 是否使用事务 i jms消息确认机制 1 2 3 0 用常量表示
            final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            //通过session创建Topic
            Topic topic = session.createTopic("TestTopic");
            //通过session创建消费者
            MessageConsumer consumer = session.createConsumer(topic);
            //指定消息监听器
            consumer.setMessageListener(new MessageListener() {
                //当我们监听的topic中存在消息,onMessage这个方法就会自动运行
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        if(textMessage.getText().equals("nihao")){
                            System.out.println("消费者接收到了消息:"+textMessage.getText());
                            message.acknowledge();
                        }else {
                            System.out.println("消息处理失败了..");
                            session.recover();
                        }
    
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            //因为要接收消息不能关闭,同时线程不能死掉
            while (true){
    
            }
    
        }
    

    先启动test2方法发起订阅“TestTopic”消息,然后启动test1方法,这时消费者就会调用session.recover()方法让消息发布者重发消息默认6次,我们能够看到7条(第一次+重发六次)“消息处理失败了..”输出。

  • 相关阅读:
    cache buffers chains ,buffer busy waits
    关于I/O的一些脚本
    模拟buffer busy waits等待事件
    找出热点块所属的用户,对象名,类型
    找到引起磁盘排序的SQL
    db file parallel write,write complete waits
    free buffer waits
    检查日志文件是否传输到备用数据库
    模拟direct path read 等待事件
    3系统启动后的 wifi 加载过程
  • 原文地址:https://www.cnblogs.com/nm666/p/10409984.html
Copyright © 2011-2022 走看看