zoukankan      html  css  js  c++  java
  • 消息队列MQ

    1.定义和分类

    1.1定义

    MQ全称为Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。

    它是典型的生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

    1.2分类

    其常见的消息队列产品如下表:

    名称 开发语言 时效性 说明
    ActiveMQ java 毫秒级 基于JMS,是Apache出品,最流行的,能力强劲的开源消息总线
    RabbitMQ

    Erlang

    微妙级

    基于AMQP协议,erlang语言开发,稳定性好

    Kafka Scala 毫秒级 分布式消息系统,高吞吐量。是Apache下的一个子项目
    RocketMQ java 毫秒级 基于JMS,阿里巴巴产品,目前交由Apache基金会

    具体的介绍见后续章节。它们的作用是异步处理,应用解耦,流量消峰。

    1.3JMS与AMQP的异同

    1)JMS全称是java message service,即java消息服务。 AMQP全称是Advanced Message Queuing Protocol,即高级消息队列协议;

    2)JMS定义了统一的接口,来对消息操作进行统一;AMQP通过规定协议来统一数据交互的格式;

    3)JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的;

    4)JMS规定了两种消息模型;而AMQP的消息模型更加丰富。

    2.ActiveMQ

    2.1下载与安装

    官网:http://activemq.apache.org/

    1)打开后点击最新版本

     2)在新的页面选择linux对应的文件下载,这里以5.16.2为例说明

    3)将apache-activemq-5.16.2-bin.tar.gz 上传到Linux上,默认Linux上已安装jdk8及以上版本。

    4)解压

    tar  zxvf  apache-activemq-5.16.2-bin.tar.gz

    5)给目录赋权

    chmod 777 apache-activemq-5.16.2
    cd apache-activemq-5.16.2in
    chmod 755 activemq 

    6)修改配置文件,允许外部访问

    cd ..
    vi conf/jetty.xml

    把127.0.0.1改为本机ip,如下图:

     7)启动

    cd bin
    ./activemq start

    看到类似下图的结果,说明启动成功

    停止命令

    ./activemq stop

    重启命令

    ./activemq restart

     8)进入管理页

    假设服务器地址为192.168.86.128 ,打开浏览器输入地址,http://192.168.86.128:8161/ 即可进入ActiveMQ管理页面。在进入时需要输入用户名和密码,均是admin,进入后如下图

     9)进入主界面。

    其中端口8161是管理页面,用户名密码都是admin;61616是服务端页面,用户名密码是guest。

    2.2消息模式

    1)点对点模式

    每个消息只有一个消费者。一旦被消费,消息就不再在消息队列中。发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列。但接收者在成功接收消息之后需向队列应答成功。

    2)发布/订阅模式

    发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。topic实现了发布和订阅,当发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到这个消息的拷贝。也就是说这种模式每个消息可以有多个消费者,发布者和订阅者之间有时间上的依赖性。另外订阅者必须保持运行的状态,才能接受发布者发布的消息。

    2.3 基本入门

    此代码不在源码中呈现,只做说明。

    2.2.1环境准备

    新建一个普通的maven工程,导入依赖

            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-client</artifactId>
                <version>5.13.4</version>
            </dependency>

    2.2.2点对点模式

    1)创建生产者QueueProducer

    package com.test;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class QueueConsumer {
        public static void main(String[] args) throws Exception {
            //1.创建连接工厂
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.16886.128:61616");
            //2.获取连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建队列对象
            Queue queue = session.createQueue("test-queue");
            //6.创建消息消费
            MessageConsumer consumer = session.createConsumer(queue);
    
            //7.监听消息
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage=(TextMessage)message;
                    try {
                        System.out.println("接收到消息:"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            //8.等待键盘输入
            System.in.read();
            //9.关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    }

    运行后通过ActiveMQ管理界面Queues菜单查询,发现有一个消息等待消费

    2)创建消费者QueueConsumer 

    package com.test;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class QueueConsumer {
        public static void main(String[] args) throws Exception {
            //1.创建连接工厂
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.86.128:61616");
            //2.获取连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建队列对象
            Queue queue = session.createQueue("test-queue");
            //6.创建消息消费
            MessageConsumer consumer = session.createConsumer(queue);
    
            //7.监听消息
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage=(TextMessage)message;
                    try {
                        System.out.println("接收到消息:"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            //8.等待键盘输入
            System.in.read();
            //9.关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    }

    运行之后再控制台打印了发送过来的消息,管理页面的消息也被消费了。

    2.2.3发布/订阅模式

    1)创建生产者TopicProducer

    package com.test;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class TopicProducer {
        public static void main(String[] args) throws Exception {
            //1.创建连接工厂
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.86.128:61616");
            //2.获取连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建主题对象
            Topic topic = session.createTopic("test-topic");
            //6.创建消息生产者
            MessageProducer producer = session.createProducer(topic);
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("欢迎来到神奇jms世界");
            //8.发送消息
            producer.send(textMessage);
            //9.关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    }

    2)创建消费者QueueConsumer 

    package com.test;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class TopicConsumer1 {
        public static void main(String[] args) throws Exception {
            //1.创建连接工厂
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.86.128:61616");
            //2.获取连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建主题对象
            Topic topic = session.createTopic("test-topic");
            //6.创建消息消费
            MessageConsumer consumer = session.createConsumer(topic);
    
            //7.监听消息
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage=(TextMessage)message;
                    try {
                        System.out.println("接收到消息1:"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            //8.等待键盘输入
            System.in.read();
            //9.关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    }

     再创建一个消费者,开启这两个消费者,然后运行生产者,会发现两个消费者会接收到消息。如果在生产者开启之后开启消费者,那么消费者是收不到消息的。在页面选择Topics查询消息

    2.4整合SpringBoot

    源码:https://github.com/zhongyushi-git/mq-collections.git。

    1)新建两个SpringBoot的项目,一个作为服务生产者,一个作为服务消费者

    2)在生产者中导入依赖

            <!--activemq-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-activemq</artifactId>
            </dependency>

    3)编写生产者配置文件

    # 应用名称
    spring.application.name=springboot-activemq-provider-demo
    # 端口号
    server.port=10010
    # 配置服务器地址
    spring.activemq.broker-url=tcp://192.168.86.128:61616
    # activemq登录名和密码
    spring.activemq.user=admin
    spring.activemq.password=admin
    # activemq模式,false点对点模式,true发布订阅模式
    spring.jms.pub-sub-domain=true
    # 队列名称
    activemq.queue=queue-msg
    # 主题名称
    activemq.topic=topic-msg

    4)编写生产者配置类

    package com.zxh.springbootactivemqproviderdemo.config;
    
    import org.apache.activemq.command.ActiveMQQueue;
    import org.apache.activemq.command.ActiveMQTopic;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import javax.jms.Queue;
    import javax.jms.Topic;
    
    @Configuration
    public class ActiveMQConfig {
    
        @Value("${activemq.queue}")
        private String activemqQueue;
    
        //定义存放消息的队列
        @Bean
        public Queue queue() {
            return new ActiveMQQueue(activemqQueue);
        }
    }

    5)编写生产者controller接口

    package com.zxh.springbootactivemqproviderdemo.controller;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.jms.Queue;
    import javax.jms.Topic;
    
    @RestController
    public class ProviderController {
     
        //注入存放消息的队列
        @Autowired
        private Queue queue;
    
     
        //注入springboot封装的工具类
        @Autowired
        private JmsMessagingTemplate jmsMessagingTemplate;
    
        /**
         * 点对点
         * @param name
         */
        @GetMapping("queue-send")
        public void queueSend(String name) {
            //发送消息
            jmsMessagingTemplate.convertAndSend(queue, name);
        }
    
    }

    6)编写消费者配置文件

    # 应用名称
    spring.application.name=springboot-activemq-demo
    # 端口号
    server.port=10011
    # 配置服务器地址
    spring.activemq.broker-url=tcp://192.168.86.128:61616
    # activemq登录名和密码
    spring.activemq.user=admin
    spring.activemq.password=admin
    # activemq模式,false点对点模式,true发布订阅模式
    spring.jms.pub-sub-domain=true
    # 队列名称
    activemq.queue=queue-msg
    # 主题名称
    activemq.topic=topic-msg

    7)编写消费者监听器

    package com.zxh.springbootactivemqconsumerdemo.listener;
    
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.TextMessage;
    
    @Component
    public class ConsumerListener {
    
        @JmsListener(destination = "${activemq.queue}")
        public void receiveQueue(Message message) {
            if(message instanceof TextMessage){
                TextMessage textMessage= (TextMessage) message;
                try {
                    System.out.println("收到的 queue message 是:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
    
            }
        }
    
    }

    8)先启动生产者,再启动消费者,然后在浏览器输入http://localhost:10010/queue-send?name=123,即可在消费者的控制台看到接收到的消息。

    到这为止是点对点模式的消息,那么发布订阅模式和其极为类似,在上述的代码上稍作修改便可。

    9)同时修改配置文件,把activemq模式改为发布订阅模式,添加主题名称

    spring.jms.pub-sub-domain=true
    # 主题名称
    activemq.topic=topic-msg

    对于spring.jms.pub-sub-domain的值有两种,分别是true和false,为true时就指定了是发布订阅模式,为false时就指定了是点对点模式。

    10)在生产者的配置类添加一个Bean

    @Value("${activemq.topic}")
    private String activemqTopic;
    
    
     //定义存放消息的主题
    @Bean
    public Topic topic() {
        return new ActiveMQTopic(activemqTopic);
    }

    11)在生产者的controller接口添加一个接口用作发布订阅接口

        //注入存放消息的队列
        @Autowired
        private Topic topic;  
    
         /**
         * 发布订阅
         * @param name
         */
        @GetMapping("topic-send")
        public void topicSend(String name) {
            //发送消息
            jmsMessagingTemplate.convertAndSend(topic, name);
        }

    12)在消费者的监听器类添加发布订阅模式的监听

        @JmsListener(destination = "${activemq.topic}")
        public void receiveTopic(Message message) {
            if(message instanceof TextMessage){
                TextMessage textMessage= (TextMessage) message;
                try {
                    System.out.println("收到的 topic message 是:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
    
            }
        }

    13)先启动消费者,再启动生产者,然后在浏览器输入http://localhost:10010/topic-send?name=454545,即可在消费者的控制台看到接收到的消息,这便是发布/订阅模式。

    2.5 JMS消息组成

    2.5.1JMS协议组成

    结构 说明
    JMS Provider 消息中间件
    JMS Producer  消息生产者
    JMS Consumer 消息消费者
    JMS Message 消息

    其中JMS Message是其最主要的部分,介绍如下。

    2.5.2JMS消息组成

    其有三部分组成,分别是消息头、消息体、消息属性。

    1)消息头

    JMS消息头预定义了若干字段用于客户端与JMS提供者之间识别和发送消息,预编译头如下:

    名称 描述 设置者
    JMSDestination 消息发送的目的地,是一个Topic或Queue send
    JMSMessagelD 消息ID,需要以ID:开头。不可篡改 send
    JMSDeliveryMode 消息的发送模式,分为NON_PERSISTENT(持久化的)和PERSISTENT(非持久化的) send
    JMSTimestamp  消息发送时的时间,可以理解为调用send()方法时的时间 send
    JMSCorrelationID 关联的消息ID client
    JMSReplyTo 消息回复的目的地 client
    JMSType 消息的类型 client
    JMSExpiration  消息失效的时间,值0表明消息不会过期,默认值为0 send 
    JMSPriority 息的优先级,0-4为普通的优化级,而5-9为高优先级,通常情况下,高优化级的消息需要优先与普通级发送。不可篡改 send

    2)消息体

    消息体就是真实发送的消息,包含5中格式。下面的演示代码均以点对点模式为例。在消息生产者的controller中注入JmsTemplate

     @Autowired
     private JmsTemplate jmsTemplate;

    原因是JmsMessagingTemplate无法通过指定消息类型来发送消息。

    A:TextMessage字符串消息

        /**
         * 文本类型
         */
        @GetMapping("text")
        public void textMessage(){
            jmsTemplate.send(queue, new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage("我是文本消息");
                }
            });
        }

    这种方式和上述整合SpringBoot时的方式是一样的,而在消费者端也是只获取的文本类型的消息,因此接收消息的代码不用再次编写。

    B:MapMessage键值对消息

    键值对也就是map类型,可以设置key和value,通过set类型设置值,get类型获取值。

        /**
         * map类型
         */
        @GetMapping("map")
        public void mapMessage(){
            jmsTemplate.send(queue, new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    MapMessage mapMessage = session.createMapMessage();
                    mapMessage.setString("name","张三");
                    mapMessage.setInt("age",20);
                    return mapMessage;
                }
            });
        }

    在接收时需要进行类型的判断,可参考TextMessage,其他类同。详细代码见源码:

    MapMessage mapMessage = (MapMessage) message;
    System.out.println("收到message是:" + mapMessage.getString("name") + "," + mapMessage.getInt("age"));

    C:ObjectMessage序列化的java对象消息

    在生产者方创建User对象,并把此对象复制到消费者方。需要注意的是,对象一定要进行序列化操作,否则无法发送成功

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class User implements Serializable {
        private String username;
        private Integer age;
        private String password;
    }

    发送消息:

        /**
         * object类型
         */
        @GetMapping("obj")
        public void ObjectMessage(){
            jmsTemplate.send(queue, new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    ObjectMessage message = session.createObjectMessage();
                    User user=new User("admin",20,"1234");
                    message.setObject(user);
                    return message;
                }
            });
        }

    接收消息:

    ObjectMessage objectMessage = (ObjectMessage) message;
    User user = (User) objectMessage.getObject();
    System.out.println("收到message是:" + user.toString());

    在向消费者发送消息时,发发生异常,也就是说对象不被activemq信任。

     那么就需要在配置文件进行配置,信任自定义对象。需要生产者和消费者方都配置

    #配置信任列表
    spring.activemq.packages.trust-all=true

    D:BytesMessage字节流消息

    以图片的发送为例。发送消息:

        /**
         * byte类型
         */
        @GetMapping("byte")
        public void BytesMessage(){
            jmsTemplate.send(queue, new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    BytesMessage message = session.createBytesMessage();
                    try {
                        File file = new File("C:\Users\zhongyushi\Pictures\1.jpg");
                        FileInputStream stream=new FileInputStream(file);
                        byte[] bytes = new byte[(int)file.length()];
                        stream.read(bytes);
                        message.writeBytes(bytes);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return message;
                }
            });
        }

    接收消息:

    BytesMessage bytesMessage = (BytesMessage) message;
    byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
    bytesMessage.readBytes(bytes);
    FileOutputStream fileOutputStream = new FileOutputStream("D://1.jpg");
    fileOutputStream.write(bytes);
    System.out.println("保存了");

    上述的操作相当于图片的复制。

    E:StreamMessage字符流消息

    可以发送任何类型的值,只是它没有key只有value。

    发送消息:

       /**
         * stream类型
         */
        @GetMapping("stream")
        public void streamMessage(){
            jmsTemplate.send(queue, new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    StreamMessage streamMessage = session.createStreamMessage();
                    streamMessage.writeString("张三");
                    streamMessage.writeInt(20);
                    return streamMessage;
                }
            });
        }

    接收消息:

    StreamMessage streamMessage = (StreamMessage) message;
    System.out.println("收到message是:" + streamMessage.readString() + "," + streamMessage.readInt());

    需要注意的是,这种类型的消息,同一种类型(string,int等)可设置多次,后面设置的数据会拼接在前面设置的数据的后面,使用逗号分隔。

    3)消息属性

    主要是给消息设置自定义的属性,实现对消息的过滤。以文本消息为例说明:

    在发送消息时设置属性:

    TextMessage textMessage = session.createTextMessage("我是文本消息");
    textMessage.setStringProperty("订单","order");
    return textMessage;

    在接收消息时获取属性:

    TextMessage textMessage = (TextMessage) message;
    System.out.println("自定义属性:"+textMessage.getStringProperty("订单"));
    System.out.println("收到message是:" + textMessage.getText());

    2.6消息持久化

    消息持久化是保证消息不丢失的重要方式。ActiveMQ提供了三种消息的存储方式:基于内存的消息存储、基于日志的消息存储、基于JDBC的消息存储。

    2.6.1基于内存的消息存储

    会把消息存储到内存中,当ActiveMQ服务重启后消息会丢失,不推荐使用。

    2.6.2基于日志的消息存储

    对于SpringBoot的架构,ActiveMQ默认就使用日志存储。KahaDB是ActiveMQ默认的日志存储方式。

    需要在生产者配置文件进行配置

    # 消息持久化配置
    spring.jms.template.delivery-mode=persistent

    配置这种方式后,日志是默认存储在ActiveMQ安装目录下的data/kahadb目录下。

    2.6.3、基于JDBC的消息存储

    可以把消息存储到数据库中。同样也需要在生产者配置文件进行持久化的配置。

    1)修改ActiveMQ安装目录下的conf/activemq.xml文件,

    添加数据源:这里数据库采用mysql,数据源使用的是windows本地的数据库(需要开启远程访问,关闭本机防火墙)

    <bean id="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
            <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
            <property name="url" value="jdbc:mysql://192.168.86.1:3306/db_activemq"/>
            <property name="username" value="root"/>
            <property name="password" value="zys123456"/>
            <property name="poolPreparedStatements" value="true"/>
    </bean>

    指定使用数据源:把原来记录日志方式改为指定jdbc方式

     <persistenceAdapter>
              <!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
              <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
     </persistenceAdapter>

    2)复制mysql驱动和数据库连接池druid的ajr到ActiveMQ的lib目录,版本可自主选择

    3)在本机新建数据库db_activemq

    4)重启ActiveMQ服务,然后再启动生产者,此时会在数据库自动创建三个表

     

    5)调用接口发送一条消息。打开表active_msgs,看到有一条消息待消费

    6)启动消费者,接收到消息,再次查看表active_msgs,待消费的一条记录已被删除

    2.7消息事务

    消息事务,是保证消息传递原子性的一个重要特征。一个事务性发送,其中一组消息要么能够全部保证到达服务器,要么都不到达服务器。

    2.7.1案例说明

    先看下面的两个案例。

    1)案例1:循环发送数据,无异常

        @GetMapping("text2")
        public void textMessage2() {
            for (int i = 0; i < 10; i++) {
                jmsMessagingTemplate.convertAndSend(queue, "消息" + i);
            }
        }

    这种情况下10数据都可以发送成功

    1)案例2:循环发送数据,有异常

        @GetMapping("text2")
        public void textMessage2() {
            for (int i = 0; i < 10; i++) {
                if (i == 5) {
                    int a = i / 0;
                }
                jmsMessagingTemplate.convertAndSend(queue, "消息" + i);
            }
        }

    这种情况下前5条数据都可以发送成功,发生异常后后面5条数据不再发送了。那么这对消息的发送还是有影响了,违背了原子性的原则。

    2.7.2生产者事务处理

    针对上述的文件,SpringBoot提供了解决办法。

    1)在配置类中添加jms事务管理器

        /**
         * jms事务管理器
         * @param connectionFactory
         * @return
         */
        @Bean
        public PlatformTransactionManager createTransactionManager(@Qualifier("jmsConnectionFactory") ConnectionFactory connectionFactory){
            return new JmsTransactionManager(connectionFactory);
        }

    2)在发送消息的方法上添加注解@Transactional。

    一般情况下,发送消息都在service层进行,这里方便演示便在controller层。@Transactional就是事务管理的注解,当发生异常时会自动回滚。它也可以用在数据库的事务操作上。若同时存在于消息和数据的操作,那么事务会对它们同时生效。

    2.7.3消费者事务处理

    当在接收消息时,正常接收时应该提交事务,发生异常时回滚事务。在回滚时,MQ会重发6次消息,当6次都失败时,会把此消息自动添加到死信队列。以接收文本消息为例,代码如下:

    @JmsListener(destination = "${activemq.queue}")
        public void receiveQueue(Message message,Session session) {
            try {
                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    System.out.println("收到message是:" + textMessage.getText());
                    //提交事务
                    session.commit();
                }
            } catch (Exception e) {
                e.printStackTrace();
                try {
                    session.rollback();
                } catch (JMSException jmsException) {
                    jmsException.printStackTrace();
                }
            }
        }

    在接收消息时添加了Session参数,来控制事务。

    2.8消息确认机制

    常用的消息确认机制有两种,分别是自动确认(Session.AUTO_ACKNOWLEDGE)和手动确认(Session.CLIENT_ACKNOWLEDGE)。

    在SpringBoot的架构中,开启了事务,那么消息是自动确认的,不需要再进行确认。

    2.9消息投递方式

    投递方式有四种,同步、异步、延迟、定时投递。

    2.9.1同步投递

    消息生产者使用持久传递模式发送消息时,Producer.send()方法会被阻塞,直到broker发送一个确认消息给生产者,这个确认消息暗示broker已经成功接收到消息并把消息保存到二级存储中。

    2.9.2异步投递

    如果应用程序能够容忍一些消息的丢失,那么可以使用异步发送。异步发送不会在受到broker的确认之前一直阻塞Producer.send方法。

    2.9.3延迟投递

    消息延迟发送到服务器。

    2.9.4定时投递

    采用定时方式发送消息到服务器。

    2.10死信队列

    用来保存处理失败或者过期的消息。

    3.RabbitMQ

    由于篇幅问题,请参考博客https://www.cnblogs.com/zys2019/p/12828152.html

    4.Kafka

    由于篇幅问题,请参考博客https://www.cnblogs.com/zys2019/p/13202787.html

    就是这么简单,你学废了吗?感觉有用的话,给笔者点个赞吧 !
  • 相关阅读:
    SQL Server 2008的审核功能
    在SSMS(2008)中对数据库对象进行搜索和筛选
    关于在ASP.NET应用程序中异步调用Web Service的解决方案
    SSIS 2008中的Data Profiler任务
    开始Windows Embedded之旅
    在Access中计算两个日期之间的工作日天数
    当PrintForm遇到"RPC服务不可用的错误”
    REST 的知识 【转载】
    在C#中实现类似Eval这类动态求值的函数【转载】
    行内数据
  • 原文地址:https://www.cnblogs.com/zys2019/p/14750856.html
Copyright © 2011-2022 走看看