zoukankan      html  css  js  c++  java
  • Springboot+ActiveMQ(ActiveMQ消息持久化,保证JMS的可靠性,消费者幂等性)

    ActiveMQ 持久化设置:

    在redis中提供了两种持久化机制:RDB和AOF 两种持久化方式,避免redis宕机以后,能数据恢复,所以持久化的功能 对高可用程序来说 很重要。

    同样在ActiveMQ 中 也提供了持久化的功能,在生产者 生产消息 到队列中,可以通过设置 该消息在队列中是否持久化。持久化以后,即使ActiveMQ重启了,队列中的消息也不会丢失

    java中,在生产者 发送消息的时候可以通过api 设置 

    producer.setDeliveryMode(DeliveryMode.PERSISTENT)
    package com.example.demo.producter;
    
    import javax.jms.Connection;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class ActiveMqProducter {
        public static String url = "tcp://127.0.0.1:61616";
    
        public static void main(String[] args) throws JMSException {
            // 根据用户名 和密码,地址,获取JMS 的连接工厂 61616端口 是mq服务的端口 而8161 是mq提供的管理后端的端口
            ActiveMQConnectionFactory connetionFactory = new ActiveMQConnectionFactory("admin", "admin", url);
            // 从连接工厂创建一条连接
            Connection connection = connetionFactory.createConnection();
            // 开启连接
            connection.start();
            // 创建session会话,第一参数表示启用事务处理,第二个参数表示启动哪种应答模式,这里启用的是自动应答 一个类似 接受 或者发送的线程
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            // 用session创建一个
            Destination destination = session.createQueue("mq-msg");
            // MessageProducer:消息生产者
            MessageProducer producer = session.createProducer(destination);
            // 设置不持久化 NON_PERSISTENT  PERSISTENT设置持久化
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            // 发送一条消息
            for (int i = 1; i <= 5; i++) {
                sendMsg(session, producer, i);
            }
            connection.close();
    
        }
    
        /**
         * 在指定的会话上,通过指定的消息生产者发出一条消息
         * 
         * @param session
         *            消息会话
         * @param producer
         *            消息生产者
         */
        public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
            // 创建一条文本消息
            TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
            // 通过消息生产者发出消息
            producer.send(message);
        }
    
    }

    如何保证JMS可靠性:

     JMS:java消息服务,JMS服务客户端之间通过异步的方式进行消息传递,在消息传递的时候,如何保证传输的消息是不是可靠。

     例如:生产者 生产消息发送到队列的时候,如果有异常抛出。消费者从队列中拉取消息消费的时候,程序异常,消费失败。

    ActiviMQ的消息签收机制:客戶端成功接收一条消息的标志是一条消息被签收,成功应答

    消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。

    1.自动签收(生产者和消费者 生产消息和消费消息,都是自动完成 )

    生产者:

    package com.example.demo.producter;
    
    import javax.jms.Connection;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class ActiveMqProducter {
        public static String url = "tcp://127.0.0.1:61616";
    
        public static void main(String[] args) throws JMSException {
            // 根据用户名 和密码,地址,获取JMS 的连接工厂 61616端口 是mq服务的端口 而8161 是mq提供的管理后端的端口
            ActiveMQConnectionFactory connetionFactory = new ActiveMQConnectionFactory("admin", "admin", url);
            // 从连接工厂创建一条连接
            Connection connection = connetionFactory.createConnection();
            // 开启连接
            connection.start();
            // 创建session会话,第一参数表示启用事务处理,第二个参数表示启动哪种应答模式,这里启用的是自动应答   一个类似 接受 或者发送的线程
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            // 用session创建一个
            Destination destination = session.createQueue("mq-msg");
            // MessageProducer:消息生产者
            MessageProducer producer = session.createProducer(destination);
            // 设置不持久化 NON_PERSISTENT  PERSISTENT设置持久化
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            // 发送一条消息
            for (int i = 1; i <= 5; i++) {
                sendMsg(session, producer, i);
            }
            connection.close();
    
        }
    
        /**
         * 在指定的会话上,通过指定的消息生产者发出一条消息
         * 
         * @param session
         *            消息会话
         * @param producer
         *            消息生产者
         */
        public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
            // 创建一条文本消息
            TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
            // 通过消息生产者发出消息
            producer.send(message);
        }
    
    }

    消费者:

    package com.example.demo.consumer;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Consumer {
        public static void main(String[] args) throws JMSException {
            // ConnectionFactory :连接工厂,JMS 用它创建连接
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616");
            // JMS 客户端到JMS Provider 的连接
            Connection connection = connectionFactory.createConnection();
            connection.start();
            // Session: 一个发送或接收消息的线程
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            // Destination :消息的目的地;消息发送给谁.
            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
            Destination destination = session.createQueue("mq-msg");
            // 消费者,消息接收者
            MessageConsumer consumer = session.createConsumer(destination);
            while (true) {
                TextMessage message = (TextMessage) consumer.receive();
                if (null != message) {
                    System.out.println("收到消息:" + message.getText());
                } else
                    break;
            }
            session.close();
            connection.close();
        }
    
    }

    通过:

    Session.AUTO_ACKNOWLEDGE 表示自动签收
     Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE)

    2.手动签收

    生产者:

    package com.example.demo.producter;
    
    import javax.jms.Connection;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class ActiveMqProducter {
        public static String url = "tcp://127.0.0.1:61616";
    
        public static void main(String[] args) throws JMSException {
            // 根据用户名 和密码,地址,获取JMS 的连接工厂 61616端口 是mq服务的端口 而8161 是mq提供的管理后端的端口
            ActiveMQConnectionFactory connetionFactory = new ActiveMQConnectionFactory("admin", "admin", url);
            // 从连接工厂创建一条连接
            Connection connection = connetionFactory.createConnection();
            // 开启连接
            connection.start();
            // 创建session会话,第一参数表示启用事务处理,第二个参数表示启动哪种应答模式,这里启用的是自动应答 一个类似 接受 或者发送的线程
            Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
            // 用session创建一个
            Destination destination = session.createQueue("mq-msg");
            // MessageProducer:消息生产者
            MessageProducer producer = session.createProducer(destination);
            // 设置不持久化 NON_PERSISTENT PERSISTENT设置持久化
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            // 发送一条消息
            for (int i = 1; i <= 5; i++) {
                sendMsg(session, producer, i);
            }
            connection.close();
    
        }
    
        /**
         * 在指定的会话上,通过指定的消息生产者发出一条消息
         * 
         * @param session
         *            消息会话
         * @param producer
         *            消息生产者
         */
        public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
            // 创建一条文本消息
            TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
            // 通过消息生产者发出消息
            producer.send(message);
        }
    
    }

    消费者:

    package com.example.demo.consumer;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Consumer {
        public static void main(String[] args) throws JMSException {
            // ConnectionFactory :连接工厂,JMS 用它创建连接
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616");
            // JMS 客户端到JMS Provider 的连接
            Connection connection = connectionFactory.createConnection();
            connection.start();
            // Session: 一个发送或接收消息的线程
            Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
            // Destination :消息的目的地;消息发送给谁.
            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
            Destination destination = session.createQueue("mq-msg");
            // 消费者,消息接收者
            MessageConsumer consumer = session.createConsumer(destination);
            while (true) {
                TextMessage message = (TextMessage) consumer.receive();
                message.acknowledge();//表示 手动签收,应答,告诉队列消费成功,可以清除消费的消息
                if (null != message) {
                    System.out.println("收到消息:" + message.getText());
                } else
                    break;
            }
            session.close();
            connection.close();
        }
    
    }

    生产者 和消费者中 需要将应该类型修改成:

    Session.CLIENT_ACKNOWLEDGE
        Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);

    消费者 还需要手动签收:

        message.acknowledge();//表示 手动签收,应答,告诉队列消费成功,可以清除消费的消息

    3.事务签收

    生产者:

    package com.example.demo.producter;
    
    import javax.jms.Connection;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class ActiveMqProducter {
        public static String url = "tcp://127.0.0.1:61616";
    
        public static void main(String[] args) throws JMSException {
            // 根据用户名 和密码,地址,获取JMS 的连接工厂 61616端口 是mq服务的端口 而8161 是mq提供的管理后端的端口
            ActiveMQConnectionFactory connetionFactory = new ActiveMQConnectionFactory("admin", "admin", url);
            // 从连接工厂创建一条连接
            Connection connection = connetionFactory.createConnection();
            // 开启连接
            connection.start();
            // 创建session会话,第一参数表示启用事务处理,第二个参数表示启动哪种应答模式,这里启用的是自动应答 一个类似 接受 或者发送的线程
            Session session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
            // 用session创建一个
            Destination destination = session.createQueue("mq-msg");
            // MessageProducer:消息生产者
            MessageProducer producer = session.createProducer(destination);
            // 设置不持久化 NON_PERSISTENT PERSISTENT设置持久化
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            // 发送一条消息
            for (int i = 1; i <= 5; i++) {
                sendMsg(session, producer, i);
                session.commit();//生产者提交事务完成以后,该消息才能存放到队列之中
            }
            connection.close();
    
        }
    
        /**
         * 在指定的会话上,通过指定的消息生产者发出一条消息
         * 
         * @param session
         *            消息会话
         * @param producer
         *            消息生产者
         */
        public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
            // 创建一条文本消息
            TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
            // 通过消息生产者发出消息
            producer.send(message);
        }
    
    }

    消费者:

     1 package com.example.demo.consumer;
     2 
     3 import javax.jms.Connection;
     4 import javax.jms.ConnectionFactory;
     5 import javax.jms.Destination;
     6 import javax.jms.JMSException;
     7 import javax.jms.MessageConsumer;
     8 import javax.jms.Session;
     9 import javax.jms.TextMessage;
    10 
    11 import org.apache.activemq.ActiveMQConnection;
    12 import org.apache.activemq.ActiveMQConnectionFactory;
    13 
    14 public class Consumer {
    15     public static void main(String[] args) throws JMSException {
    16         // ConnectionFactory :连接工厂,JMS 用它创建连接
    17         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616");
    18         // JMS 客户端到JMS Provider 的连接
    19         Connection connection = connectionFactory.createConnection();
    20         connection.start();
    21         // Session: 一个发送或接收消息的线程
    22         Session session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
    23         // Destination :消息的目的地;消息发送给谁.
    24         // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
    25         Destination destination = session.createQueue("mq-msg");
    26         // 消费者,消息接收者
    27         MessageConsumer consumer = session.createConsumer(destination);
    28         while (true) {
    29             TextMessage message = (TextMessage) consumer.receive();
    30             session.commit();//表示成功消费 该条信息,应答完成,从队列中清除该消息
    31             if (null != message) {
    32                 System.out.println("收到消息:" + message.getText());
    33             } else
    34                 break;
    35         }
    36         session.close();
    37         connection.close();
    38     }
    39 
    40 }

    生产者 和消费者

    Session session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);

    开启事务就可以了,至于选择 哪种签收类型,尝试了,自动和 手动都可以,只需要 在生产者和消费者里面 提交事务就可以了。

    并不是所有的消息中间件都是以这种方式保证消息的可行性,只是ActiveMQ 是用了这三种应答机制。

    Springboot+Activemq整合

    1 导入整合所需要的依赖:

    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-activemq</artifactId>
            </dependency>

    2 创建 application.properties文件

    spring.activemq.broker-url=tcp://127.0.0.1:61616
    spring.activemq.user=admin
    spring.activemq.password=admin
    server.port=8080
    queue=myqueue

    3.自定义配置文件QueueConfig 读取配置文件的队列名,根据队列名字创建一个Queue

    package com.example.demo;
    
    import javax.jms.Queue;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.command.ActiveMQQueue;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
    import org.springframework.jms.core.JmsTemplate;
    
    @Configuration
    public class QueueConfig {
    
        @Value("${queue}")
        private String queue;
    
        @Bean
        public Queue logQueue() {
            return new ActiveMQQueue(queue);
        }
    }

    4.创建生产者,可以直接使用提供的模板 JmsMessagingTemplate 进行消息的发送:

    package com.example.demo.producter;
    
    import javax.jms.Queue;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.stereotype.Component;
    
    import com.example.demo.SpringbootActivemqApplication;
    
    @Component
    public class Producter {
        @Autowired
        private JmsMessagingTemplate jmsMessagingTemplate;
        @Autowired
        private Queue queue;
        private static Logger logger = LoggerFactory.getLogger(
    Producter 
    .class); public void send() { String str = "生产者生产数据:" + System.currentTimeMillis(); jmsMessagingTemplate.convertAndSend(queue, str); logger.info("生产者数据:{}", str); } }

    5.启动类:

    package com.example.demo;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ApplicationListener;
    import org.springframework.context.event.ContextRefreshedEvent;
    import org.springframework.scheduling.annotation.EnableScheduling;
    
    import com.example.demo.producter.Producter;
    import com.example.demo.producter.consumer.Consumer;
    
    @SpringBootApplication
    @EnableScheduling
    public class SpringbootActivemqApplication implements ApplicationListener<ContextRefreshedEvent> {
        @Autowired
        public Producter producter;
        @Autowired
        public Consumer consumer;
    
        public static void main(String[] args) {
            SpringApplication.run(SpringbootActivemqApplication.class, args);
            //onApplicationEvent方法 在启动springboot的时候 会运行该方法,可根据项目实际情况 选择合适调用消息发送方法
    
        }
    
        @Override
        public void onApplicationEvent(ContextRefreshedEvent event) {
            producter.send();
        }
    
    }

    6.启动项目,控制台输出内容:

       

    7.创建消费者,创建消费者比较容易,只需要监听队列就可以:

    package com.example.demo.producter.consumer;
    
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class Consumer {
    
        @JmsListener(destination = "${queue}")
        public void receive(String msg) {
            System.out.println("监听器收到msg:" + msg);
        }
    
    }

    8.最后结果:

    日志显示先后顺序 是因为选择打印的方式不一样.可以忽略不计

    使用ActivetyMQ注意事项

     在Activetymq中,消费者在接受消息消费的时候,如果程序出现了异常,Activemq会有自动重试机制

     例如:

    package com.example.demo.producter.consumer;
    
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class Consumer {
    
        @JmsListener(destination = "${queue}")
        public void receive(String msg) {
            System.out.println("监听器收到msg:" + msg);
            int i = 1 / 0;
        }
    
    }

    通过 结果可以看出,该消息会一直重试,在项目中可能出现的异常原因 粗略算两种吧:

    1.需要通过修改代码,重新打包发布的bug,比如空指针,格式异常,这些属于开发人员程序的bug

    2.调用第三方接口的时候,出现超时,或者一段时间内连接不上,如数据库连接超时,过段时间可能就能连上这种

    对于第一种出现的情况,即使Acitemq重试N次,程序还是会错,所以这种情况就可以不需要让Activemq重试了

    对于第二种情况,则有必要让mq重试,数据库连接可能过段时间久恢复了,此时在重试 则可以正常消费。

    对于第一种情况,如果是程序代码引起的异常,重试机制 也起不到作用了,则 可以通过 日志记录 然后人工手动补偿数据,或者说定时job健康检查

    Activemq幂等性和消费者集群:

     什么叫幂等性:一次和多次请求某一个资源对于资源本身应该具有同样的结果(网络超时等问题除外)。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。

    举例:在Activemq中,消费者消费消息的时候,数据库里面的age字段的值是1,消费完消息以后 值变成2 ,此时程序有异常抛出。Activemq重试机制,消费者 在去消费该消息的时候,数据库里面的值 应该依旧是2 不会变成3,多次消费同一条消息,不影响数据库的结果.

    package com.example.demo.producter.consumer;
    
    import javax.annotation.Resource;
    import javax.jms.JMSException;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class Consumer {
        @Resource
        private RedisTemplate<String, Object> redisTemplate;
    
        @JmsListener(destination = "${queue}")
        public void receive(TextMessage message, Session session) throws JMSException, InterruptedException {
    
            String msgid = message.getJMSMessageID();
            String msg = message.getText();
            System.out.println("监听器收到msg:" + msg + "消息id" + msgid);
            // 检测是否存在该消息id的key 如果不存在表示 第一次消费
            if (!redisTemplate.hasKey(msgid)) {
                // 数据库age+1
                int age = 0;
                age++;
                redisTemplate.opsForValue().set(msgid, msgid);
            } else {
                // 如果存在 表示已经消费成功,在次接受该消息的时候,则手动签收 避免在次重试
                message.acknowledge();
            }
        }
    
    }

    session.recover(); 方法 可以告诉Activemq 重试

    消费者集群会不会出现消息被重复消费的情况呢:不会,在一个队列的情况下,队列是知道当前有多少个消费者连接,分发消息的时候不会出现重复消费

    队列或者生产者集群的时候 可能会出现重复消费的情况,可以使用zk避免该情况

    以上都是Activemq的一些基本知识,掌握mq是必要掌握的技能

  • 相关阅读:
    高精度模板(未完待续)
    $CH0201$ 费解的开关
    $POJ2288$ $Islands$ $and$ $Bridges$
    luoguP1445 [Violet]樱花
    P3694 邦邦的大合唱站队
    [NOI2009]管道取珠
    [AHOI2006]基因匹配
    luogu P3411 序列变换
    HNOI2001 产品加工
    牛客ACM赛 B [小a的旅行计划 ]
  • 原文地址:https://www.cnblogs.com/920913cheng/p/10609568.html
Copyright © 2011-2022 走看看