zoukankan      html  css  js  c++  java
  • 消息队列 RabbitMQ

    消息队列,即MQ,Message Queue。

    消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

    结合前面所说的问题:

    • 商品服务对商品增删改以后,无需去操作索引库或静态页面,只是发送一条消息,也不关心消息被谁接收。

    • 搜索服务和静态页面服务接收消息,分别去处理索引库和静态页面。

    如果以后有其它系统也依赖商品服务的数据,同样监听消息即可,商品服务无需任何代码修改。

     

    AMQP和JMS

    MQ是消息通信的模型,并不是具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。

     

    两者间的区别和联系:

    • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式

    • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。

    • JMS规定了两种消息模型;而AMQP的消息模型更加丰富

     

    引入 依赖 

    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.3.2</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    建立RabbitMQ连接的工具类,方便其他程序获取连接:

    public class ConnectionUtil {
        /**
         * 建立与RabbitMQ的连接
         * @return
         * @throws Exception
         */
        public static Connection getConnection() throws Exception {
            //定义连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置服务地址
            factory.setHost("192.168.56.101");
            //端口
            factory.setPort(5672);
            //设置账号信息,用户名、密码、vhost
            factory.setVirtualHost("/leyou");
            factory.setUsername("leyou");
            factory.setPassword("leyou");
            // 通过工程获取连接
            Connection connection = factory.newConnection();
            return connection;
        }
    }

     创建连接工具类 

    public class ConnectionUtil {
        /**
         * 建立与RabbitMQ的连接
         * @return
         * @throws Exception
         */
        public static Connection getConnection() throws Exception {
            //定义连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置服务地址
            factory.setHost("192.168.56.101");
            //端口
            factory.setPort(5672);
            //设置账号信息,用户名、密码、vhost
            factory.setVirtualHost("/test");
            factory.setUsername("test");
            factory.setPassword("test");
            // 通过工程获取连接
            Connection connection = factory.newConnection();
            return connection;
        }
    }

     

    五种消息模型

    基本消息模型

    RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。 在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。

    RabbitMQ与邮局的主要区别是它不处理纸张,而是接受,存储和转发数据消息的二进制数据块。

    P(producer/ publisher):生产者,一个发送消息的用户应用程序。

    C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序

    队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

    总之:

    生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。

     

    生产者发送消息

    public class Send {
    
        private final static String QUEUE_NAME = "simple_queue";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            // 从连接中创建通道,这是完成大部分API的地方。
            Channel channel = connection.createChannel();
    
            // 声明(创建)队列,必须声明队列才能够发送消息,我们可以把消息发送到队列中。
            // 声明一个队列是幂等的 - 只有当它不存在时才会被创建
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 消息内容
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
    
            //关闭通道和连接
            channel.close();
            connection.close();
        }
    }

    点击队列名称,进入详情页,可以查看消息:

     

    消费者获取消息

    public class Recv {
        private final static String QUEUE_NAME = "simple_queue";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 创建通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [x] received : " + msg + "!");
                }
            };
            // 监听队列,第二个参数:是否自动进行消息确认。
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }

    这个时候,队列中的消息就没了

    消息确认机制(ACK)

    通过刚才的案例可以看出,消息一旦被消费者接收,队列中的消息就会被删除。

    那么问题来了:RabbitMQ怎么知道消息被接收了呢?

    如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!

    因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:

    • 自动ACK:消息一旦被接收,消费者自动发送ACK

    • 手动ACK:消息接收后,不会发送ACK,需要手动调用

    大家觉得哪种更好呢?

    这需要看消息的重要性:

    • 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便

    • 如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。

    我们之前的测试都是自动ACK的,如果要手动ACK,需要改动我们的代码:

     

    自动ACK存在的问题

    运行消费者,程序抛出异常。但是消息依然被消费:

    手动ACK

    public class Recv2 {
        private final static String QUEUE_NAME = "simple_queue";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 创建通道
            final Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [x] received : " + msg + "!");
                    // 手动进行ACK
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            // 监听队列,第二个参数false,手动进行ACK
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    }

    自动ACK存在的问题

    运行消费者,程序抛出异常。但是消息依然被消费:

     

    work消息模型

    工作队列或者竞争消费者模式

     

     工作队列,又称任务队列。主要思想就是避免执行资源密集型任务时,必须等待它执行完成。相反我们稍后完成任务,我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。当你运行许多消费者时,任务将在他们之间共享,但是一个消息只能被一个消费者获取

    避免消息堆积?

    1)采用workqueue,多个消费者监听同一队列。

    2)接收到消息以后,而是通过线程池,异步消费。

     

    生产者

    public class Send {
        private final static String QUEUE_NAME = "test_work_queue";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 循环发布任务
            for (int i = 0; i < 50; i++) {
                // 消息内容
                String message = "task .. " + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
    
                Thread.sleep(i * 2);
            }
            // 关闭通道和连接
            channel.close();
            connection.close();
        }
    }

    消费者1

     消费者2

    与消费者1基本类似,就是没有设置消费耗时时间。

    这里是模拟有些消费者快,有些比较慢。

     

    接下来,两个消费者一同启动,然后发送50条消息:

    可以发现,两个消费者各自消费了25条消息,而且各不相同,这就实现了任务的分发。

     

    能者多劳

    现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。

     我们可以使用basicQos方法和prefetchCount = 1设置。 这告诉RabbitMQ一次不要向工作人员发送多于一条消息。 或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。 相反,它会将其分派给不是仍然忙碌的下一个工作人员。

     

    订阅模型分类

    将会传递一个信息给多个消费者。 这种模式被称为“发布/订阅”。

    1、1个生产者,多个消费者

    2、每一个消费者都有自己的一个队列

    3、生产者没有将消息直接发送到队列,而是发送到了交换机

    4、每个队列都要绑定到交换机

    5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的

    X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。

    Exchange类型有以下几种:

    Fanout:广播,将消息交给所有绑定到交换机的队列

    Direct:定向,把消息交给符合指定routing key 的队列

    Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

    Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

    订阅模型-Fanout

    生产者

    • 声明Exchange,不再声明Queue

    • 发送消息到Exchange,不再发送到Queue

    public class Send {
    
        private final static String EXCHANGE_NAME = "fanout_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            
            // 声明exchange,指定类型为fanout
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            
            // 消息内容
            String message = "Hello everyone";
            // 发布消息到Exchange
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [生产者] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
    }

    消费者1

    public class Recv {
        private final static String QUEUE_NAME = "fanout_exchange_queue_1";
    
        private final static String EXCHANGE_NAME = "fanout_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者1] received : " + msg + "!");
                }
            };
            // 监听队列,自动返回完成
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }

    队列需要和交换机绑定

    public class Recv2 {
        private final static String QUEUE_NAME = "fanout_exchange_queue_2";
    
        private final static String EXCHANGE_NAME = "fanout_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者2] received : " + msg + "!");
                }
            };
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }

    订阅模型-Direct

    在订阅模式中,生产者发布消息,所有消费者都可以获取所有消息。

    在路由模式中,我们将添加一个功能 - 我们将只能订阅一部分消息。 例如,我们只能将重要的错误消息引导到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

    但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

    在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

    消息的发送方在向Exchange发送消息时,也必须指定消息的routing key。

     

    P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。

    X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列

    C1:消费者,其所在队列指定了需要routing key 为 error 的消息

    C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

     

    生产者

    public class Send {
        private final static String EXCHANGE_NAME = "direct_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明exchange,指定类型为direct
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            // 消息内容
            String message = "商品新增了, id = 1001";
            // 发送消息,并且指定routing key 为:insert ,代表新增商品
            channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());
            System.out.println(" [商品服务:] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
    }

    消费者1

    public class Recv {
        private final static String QUEUE_NAME = "direct_exchange_queue_1";
        private final static String EXCHANGE_NAME = "direct_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            // 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
    
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者1] received : " + msg + "!");
                }
            };
            // 监听队列,自动ACK
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }

    消费者2

    public class Recv2 {
        private final static String QUEUE_NAME = "direct_exchange_queue_2";
        private final static String EXCHANGE_NAME = "direct_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            // 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
    
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者2] received : " + msg + "!");
                }
            };
            // 监听队列,自动ACK
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }

    订阅模型-Topic

    Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

    Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

    通配符规则:

    `#`:匹配一个或多个词

    `*`:匹配不多不少恰好1个词

    举例:

    `audit.#`:能够匹配`audit.irs.corporate` 或者 `audit.irs`

    `audit.*`:只能匹配`audit.irs`

    生产者

    public class Send {
        private final static String EXCHANGE_NAME = "topic_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明exchange,指定类型为topic
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            // 消息内容
            String message = "新增商品 : id = 1001";
            // 发送消息,并且指定routing key 为:insert ,代表新增商品
            channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes());
            System.out.println(" [商品服务:] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
    }

    消费者1

    public class Recv {
        private final static String QUEUE_NAME = "topic_exchange_queue_1";
        private final static String EXCHANGE_NAME = "topic_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            // 绑定队列到交换机,同时指定需要订阅的routing key。需要 update、delete
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
    
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者1] received : " + msg + "!");
                }
            };
            // 监听队列,自动ACK
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }

    消费者2

    /**
     * 消费者2
     */
    public class Recv2 {
        private final static String QUEUE_NAME = "topic_exchange_queue_2";
        private final static String EXCHANGE_NAME = "topic_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            // 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");
    
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者2] received : " + msg + "!");
                }
            };
            // 监听队列,自动ACK
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }

    持久化

    如何避免消息丢失?

    1) 消费者的ACK机制。可以防止消费者丢失消息。

    2) 但是,如果在消费者消费之前,MQ就宕机了,消息就没了。

     

    是可以将消息进行持久化呢?

    要将消息持久化,前提是:队列、Exchange都持久化

     

    交换机持久化 

     队列持久化 

     消息持久化

     

     

     

  • 相关阅读:
    【leetcode】Binary Search Tree Iterator
    【leetcode】Palindrome Partitioning II
    【leetcode】Best Time to Buy and Sell Stock III
    【leetcode】Best Time to Buy and Sell Stock II
    【leetcode】Longest Consecutive Sequence
    【leetcode】Factorial Trailing Zeroes
    【leetcode】Simplify Path
    【leetcode】Generate Parentheses
    【leetcode】Combination Sum II
    【leetcode】Combination Sum
  • 原文地址:https://www.cnblogs.com/qin1993/p/12650175.html
Copyright © 2011-2022 走看看