zoukankan      html  css  js  c++  java
  • RabbitMQ的使用(一)_JavaClient实现简单模式+Work模式

    RabbitMQ的使用

    rabbitmq的官网教程地址:https://www.rabbitmq.com/getstarted.html

    1.RabbitMQ定义:是一款开源的消息代理的队列服务器,是一种应用程序之间的通信方法。RabbitMQ是基于Erlang语言来编写的,基于AMQP协议的队列。你可以把RabbitMQ想象成一个邮箱。发送人投递消息到邮箱中。接收者从邮箱中取出消息的过程

       RabbitMQ负责接收、存储、转发消息。

    2.RabbitMQ的使用场景:异步处理、流量削峰、应用解耦、应用日志

    3.RabbitMQ中的几个概念:

      生产者:发送消息的程序就是生产者

      消费者:就是接收消息的那一方,负责对接收到的消息进行处理

      队列:队列是存储消息的缓冲区。

      交换机:一方面它接收来自生产者的消息,另一方面它将消息推入队列

      绑定建:交换机和队列之间的绑定关系。

    3.消息模式的种类:

      1.简单模式

      2.Work模式(Work queues):

      3.订阅模式(Publish/Subscribe)

      4.路由模式( Routing)

      5.Topics

      6.RPC

      7.Publisher Confirms

      官网图片如下:

      

      

    4.交换机种类:direct、topic、fanout、headers四种。

      消息只存储在队列中,队列只受主机内存和磁盘的限制,它本质上是一个大的消息缓冲区。许多生产者可以把消息发送到一个队列,许多消费者也可以尝试从一个队列接收数据。

    5.Java Client使用简单模式:

         模式图:一个生产者、一个队列、一个消费者

      

        5.1:在Rabbitmq服务器上创建一个/yingxiaocao的虚拟主机,并为这个主机添加一个yingxiaocao的用户,提供访问权限。

       5.2 :导入依赖包:with the groupId com.rabbitmq and the artifactId amqp-client    

      <dependencies>
            <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.9.0</version>
            </dependency>
    
        </dependencies>
    View Code

      5.3 :创建连接工厂

    public class RabbitMQConnectionFactory {
    
        public static Connection getRabbitMQConnections() throws IOException, TimeoutException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost(CommonConstant.ADD_RESS);
            connectionFactory.setPort(CommonConstant.PORT);
            connectionFactory.setUsername(CommonConstant.USER_NAME);
            connectionFactory.setPassword(CommonConstant.PASSWORD);
            connectionFactory.setVirtualHost(CommonConstant.VIRTUAL_HOST);
            return connectionFactory.newConnection();
        }
    }
    View Code

      5.4 创建生产者

    public class RabbitSender {
        // 声明一个队列的名字
        private static final String SIMPLE_QUEUE_NAME = "hello_world_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.获取一个链接
            Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
            // 2.创建一个通道
            Channel channel = rabbitMQConnections.createChannel();
            /*
                3.要发送消息,我们必须声明一个要发送的队列;然后我们可以向队列发布一条消息
                参数1:queue:队列的名字
                参数2:durable: 队列是否做持久化操作,true表示队列做持久化操作,该队列将在服务器重启后,继续存在
                参数3:exclusive:
                参数4:autoDelete:是否声明自动删除
                参数5:arguments 队列参数
             */
            channel.queueDeclare(SIMPLE_QUEUE_NAME, false, false, false, null);
            // 3.声明要发送的消息
            String sendMsg = "小河流水哗啦1啦";
            /*
             * 4.发送消息
             *  参数1:exchange 交换机的名字,简单模式不需要交换机,默认的就好
             *  参数2:routingKey
             *  参数3:props 属性
             *  参数4:要发送的消息体
             */
            channel.basicPublish("", SIMPLE_QUEUE_NAME, null, sendMsg.getBytes());
            channel.close();
            rabbitMQConnections.close();
        }
    
    }
    View Code

      5.5 创建消费者

    public class RabbitMQReceiver {
    
        // 声明一个队列的名字
        private static final String SIMPLE_QUEUE_NAME = "hello_world_queue";
    
        /**
         * 消费者这一方的使用,和生产者一样,都是创建一个链接,打开一个通道。然后从队列中获取消息
         * DeliverCallback接口来缓冲由服务器推送给我们的消息。
         */
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建一个链接
            Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
            // 2.打开一个通道
            final Channel channel = rabbitMQConnections.createChannel();
            // 3.声明一个队列:我们在这里也声明了队列。因为我们可能在启动生产者之前启动消费者,所以我们希望在尝试使用来自该队列的消息之前确保该队列存在。
            channel.queueDeclare(SIMPLE_QUEUE_NAME, false, false, false, null);
            // 4.服务器的消息是异步发送给我们的,所以需要提供一个回调。这个回调将缓冲消息,直到我们使用收到的消息
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery delivery) throws IOException {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println(new Date() + message);
                }
            };
            // 消费消息,参数1:队列名称 参数2:true标识自动应答,如果设置为false,rabbitmq还会继续推送消息给消费者 参数三:标识回调
            channel.basicConsume(SIMPLE_QUEUE_NAME, true, deliverCallback, consumerTag -> {
            });
        }
    }
    View Code

    或者使用

    这个来接收消息        
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("收到的消息是:===>" + new String(body));
                    channel.basicAck(envelope.getDeliveryTag(), false);//告诉服务器我们收到消息, true代表拒收消息
                }
            };
            //如果设置自动应答为false,那么我们必须手动告诉服务器我收到消息了,否则下次消费者重启会再次收到之前的消息
            channel.basicConsume(QUEUENAME, false, defaultConsumer);
    View Code

      

    5.6 测试结果,每发送一条消息,消费者就能收到一条

      

    6.Java Client使用Work模式

      work模式图:一个生产者+一个队列+多个消费者

           

        修改生产者代码,由于没有实际的任务,使用Thread.sleep()来模拟一个复杂的任务。

    public class WorkRabbitSender {
        // 声明一个队列的名字
        private static final String WORK_QUEUE_NAME = "work_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.获取一个链接
            Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
            // 2.创建一个通道
            Channel channel = rabbitMQConnections.createChannel();
            /*
                3.要发送消息,我们必须声明一个要发送的队列;然后我们可以向队列发布一条消息
                参数1:queue:队列的名字
                参数2:durable: 队列是否做持久化操作,true表示队列做持久化操作,该队列将在服务器重启后,继续存在
                参数3:exclusive:
                参数4:autoDelete:是否声明自动删除
                参数5:arguments 队列参数
             */
            channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
            // 3.声明要发送的消息
            /*
             * 4.发送消息
             *  参数1:exchange 交换机的名字,简单模式不需要交换机,默认的就好
             *  参数2:routingKey
             *  参数3:props 属性
             *  参数4:要发送的消息体
             */
            for (int i=1;i<=100;i++) {
                // 由于没有实际的任务,用Thead.sleep()来表示发送复杂的字符串
                String sendMsg = "小河流水哗啦1啦========>"+i;
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicPublish("", WORK_QUEUE_NAME, null, sendMsg.getBytes());
            }
            System.out.println("消息发送完成");
            channel.close();
            rabbitMQConnections.close();
        }
    
    }
    View Code

      同样修改消费者代码。每个消息被消费后,同样使用Thread.sleep()休息一下。它将处理传递的消息并执行任务,现在使用自动应答的方式,消费者C1

    public class WorkRabbitMQReceiver {
        // 声明一个队列的名字
        private static final String WORK_QUEUE_NAME = "work_queue";
    
        /**
         * 消费者这一方的使用,和生产者一样,都是创建一个链接,打开一个通道。然后从队列中获取消息
         * DeliverCallback接口来缓冲由服务器推送给我们的消息。
         */
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建一个链接
            Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
            // 2.打开一个通道
            final Channel channel = rabbitMQConnections.createChannel();
            // 3.声明一个队列:我们在这里也声明了队列。因为我们可能在启动生产者之前启动消费者,所以我们希望在尝试使用来自该队列的消息之前确保该队列存在。
            channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
            // 4.服务器的消息是异步发送给我们的,所以需要提供一个回调。这个回调将缓冲消息,直到我们使用收到的消息
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery delivery) throws IOException {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println(new Date() + message);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            // 消费消息,参数1:队列名称 参数2:true标识自动应答,如果设置为false,rabbitmq还会继续推送消息给消费者 参数三:标识回调
            channel.basicConsume(WORK_QUEUE_NAME, true, deliverCallback, consumerTag -> {
            });
        }
    }
    View Code

       创建一个消费者C2

    public class WorkRabbitMQReceiver2 {
        // 声明一个队列的名字
        private static final String WORK_QUEUE_NAME = "work_queue";
    
        /**
         * 消费者这一方的使用,和生产者一样,都是创建一个链接,打开一个通道。然后从队列中获取消息
         * DeliverCallback接口来缓冲由服务器推送给我们的消息。
         */
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建一个链接
            Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
            // 2.打开一个通道
            final Channel channel = rabbitMQConnections.createChannel();
            // 3.声明一个队列:我们在这里也声明了队列。因为我们可能在启动生产者之前启动消费者,所以我们希望在尝试使用来自该队列的消息之前确保该队列存在。
            channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
            // 4.服务器的消息是异步发送给我们的,所以需要提供一个回调。这个回调将缓冲消息,直到我们使用收到的消息
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery delivery) throws IOException {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println(new Date() + message);
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            // 消费消息,参数1:队列名称 参数2:true标识自动应答,如果设置为false,rabbitmq还会继续推送消息给消费者 参数三:标识回调
            channel.basicConsume(WORK_QUEUE_NAME, true, deliverCallback, consumerTag -> {
            });
        }
    }
    View Code

     默认情况下,RabbitMQ将按顺序将每条消息发送给下一个使用者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为轮询

    效果图如下:

     轮询的方式的弊端。例如上述代码。假设执行代码消耗时间忽略不计,以线程睡眠时间为消费者处理业务需要的时间。那么对于消费者C1来说,每次执行消息需要休息1s,而消费者C2每次执行消息休息1ms。显然,消费者C2的处理速度远快于消费者C1,在这种情况下,轮询的方式,仍然会使两台服务器获取到相同的消息数。而我们的RabbitMQ服务器并不知道,两台服务器的各自处理速度。这就会造成一些性能损失。这是因为RabbitMQ只在消息进入队列时发送消息。它不查看用户未确认消息的数量。它只是盲目地将第n个消息发送给第n个消费者

    为了解决这种情况:我们可以使用设置为prefetchCount = 1的basicQos方法。这告诉RabbitMQ一次不要给一个消费者发送一条以上的消息。或者,换句话说,在消费者处理并确认之前,不要向它发送新消息。相反,它将把它分派到下一个不繁忙的消费者

    生产者代码 维持不变,消费者代码,加上如下两句话

    int prefetchCount = 1;
    channel.basicQos(prefetchCount);
    View Code
    basicQos根据情况设置:消费者C1设置为1,消费者C2设置为3
    注意:这时候,不能使用自动应答的方式,而是应改为手动应答的方式。否则还是轮询的接收方式。自动应答,是消息被发送出去之后,不管消费者是否消费成功,都被rabbitmq认为是已消费完成。然后就会发送下一条消息给消费者
    修改消费者C1代码:
    public class WorkRabbitMQReceiver {
        // 声明一个队列的名字
        private static final String WORK_QUEUE_NAME = "work_queue";
    
        /**
         * 消费者这一方的使用,和生产者一样,都是创建一个链接,打开一个通道。然后从队列中获取消息
         * DeliverCallback接口来缓冲由服务器推送给我们的消息。
         */
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建一个链接
            Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
            // 2.打开一个通道
            final Channel channel = rabbitMQConnections.createChannel();
            // 3.声明一个队列:我们在这里也声明了队列。因为我们可能在启动生产者之前启动消费者,所以我们希望在尝试使用来自该队列的消息之前确保该队列存在。
            channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
            // 4.服务器的消息是异步发送给我们的,所以需要提供一个回调。这个回调将缓冲消息,直到我们使用收到的消息
            channel.basicQos(1);
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery delivery) throws IOException {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println(new Date() + message);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
            // 消费消息,参数1:队列名称 参数2:true标识自动应答,如果设置为false,rabbitmq还会继续推送消息给消费者 参数三:标识回调
            channel.basicConsume(WORK_QUEUE_NAME, false, deliverCallback, consumerTag -> {
            });
        }
    }
    View Code

    修改消费者C2代码:

    public class WorkRabbitMQReceiver2 {
        // 声明一个队列的名字
        private static final String WORK_QUEUE_NAME = "work_queue";
    
        /**
         * 消费者这一方的使用,和生产者一样,都是创建一个链接,打开一个通道。然后从队列中获取消息
         * DeliverCallback接口来缓冲由服务器推送给我们的消息。
         */
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建一个链接
            Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
            // 2.打开一个通道
            final Channel channel = rabbitMQConnections.createChannel();
            // 3.声明一个队列:我们在这里也声明了队列。因为我们可能在启动生产者之前启动消费者,所以我们希望在尝试使用来自该队列的消息之前确保该队列存在。
            channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
            channel.basicQos(5);
            // 4.服务器的消息是异步发送给我们的,所以需要提供一个回调。这个回调将缓冲消息,直到我们使用收到的消息
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery delivery) throws IOException {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println(new Date() + message);
                    try {
                        Thread.sleep(3);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
            // 消费消息,参数1:队列名称 参数2:true标识自动应答,如果设置为false,rabbitmq还会继续推送消息给消费者 参数三:标识回调
            channel.basicConsume(WORK_QUEUE_NAME, false, deliverCallback, consumerTag -> {
            });
        }
    }
    View Code

    效果图如下:

    很显然,不再是轮询的方式接收消息了。

    
    

    7.消息确认机制:

    如果一个消费者开始了一个很长的任务,但是只执行了一半,消费者就死掉了。如果使用自动应答的方式,一旦RabbitMQ向消费者发送了一条消息,它就会立即将其标记为删除。在这种情况下,如果消费者死掉了,我们将丢失它正在处理的信息。我们还将丢失所有发送给这个特定消费者但尚未处理的消息。但是这样的话,程序就会有问题。为了确保消息不会丢失,RabbitMQ支持消息确认。一个确认被消费者发送回来告诉RabbitMQ一个特定的消息已经被接收,被处理并且RabbitMQ可以自由地删除它。

    如果消费者在没有发送ack的应答的情况下死亡(它的通道关闭了,连接关闭了,或者TCP连接丢失了),RabbitMQ将理解消息没有被完全处理,并将其重新排队。如果同时有其他消费者在线,它就会迅速地将其重新发送给其他消费者。这样你就可以确保没有信息丢失。前面的列子中,我将autoAck=true表示自动应答,现在只需要将该参数改为false,表示手动应答。上述代码已经演示了

    8.消息持久化操作

    我们已经学会了如何确保即使消费者死了,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。

    当RabbitMQ退出或崩溃时,它将忘记队列和消息,除非你告诉它不要这样做。要确保消息不丢失,需要做两件事:我们需要将队列和消息标记为持久的。

    首先,我们需要确保队列在RabbitMQ节点重新启动时能够存活。为此,我们需要将其声明为持久的

    boolean durable = true;
    channel.queueDeclare("work_queue", durable, false, false, null);

    这个命令本身是正确的,但是它不能在我们目前的设置中工作。这是因为我们已经定义了一个名为work_queue的队列,它不是持久的。RabbitMQ不允许你用不同的参数重新定义一个已有的队列,如果这样做了都会返回一个错误。

    错误信息如下:

    Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'work_queue' in vhost '/yingxiaocao': received 'true' but current is 'false', class-id=50, method-id=10)
        at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
        at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
    View Code

    解决办法:声明一个具有不同名称的队列

    boolean durable = true;
    channel.queueDeclare("durable_work_queue", durable, false, false, null);

    在这一点上,我们确信即使RabbitMQ重启,durable_work_queue队列也不会丢失。现在我们需要将消息标记为持有的。

    可通过将MessageProperties设置为PERSISTENT_TEXT_PLAIN

    channel.basicPublish("", "task_queue",
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes());

     修改生产者代码,设置队列持久化操作。消息持久化操作,如下

    public class WorkRabbitSender {
        // 声明一个队列的名字
        private static final String WORK_QUEUE_NAME = "durable_work_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.获取一个链接
            Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
            // 2.创建一个通道
            Channel channel = rabbitMQConnections.createChannel();
            /*
                3.要发送消息,我们必须声明一个要发送的队列;然后我们可以向队列发布一条消息
                参数1:queue:队列的名字
                参数2:durable: 队列是否做持久化操作,true表示队列做持久化操作,该队列将在服务器重启后,继续存在
                参数3:exclusive:
                参数4:autoDelete:是否声明自动删除
                参数5:arguments 队列参数
             */
            channel.queueDeclare(WORK_QUEUE_NAME, true, false, false, null);
            // 3.声明要发送的消息
            /*
             * 4.发送消息
             *  参数1:exchange 交换机的名字,简单模式不需要交换机,默认的就好
             *  参数2:routingKey
             *  参数3:props 属性
             *  参数4:要发送的消息体
             */
            for (int i=1;i<=100;i++) {
                // 由于没有实际的任务,用Thead.sleep()来表示发送复杂的字符串
                String sendMsg = "小河流水哗啦1啦========>"+i;
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicPublish("", WORK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, sendMsg.getBytes());
            }
            System.out.println("消息发送完成");
            channel.close();
            rabbitMQConnections.close();
        }
    
    }
    View Code

    修改消费者。设置队列持久化操作。生产者和消费者的队列参数要一样,不然还是会报上述错误

    消费者C1代码如下:

    public class WorkRabbitMQReceiver {
        // 声明一个队列的名字
        private static final String WORK_QUEUE_NAME = "durable_work_queue";
    
        /**
         * 消费者这一方的使用,和生产者一样,都是创建一个链接,打开一个通道。然后从队列中获取消息
         * DeliverCallback接口来缓冲由服务器推送给我们的消息。
         */
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建一个链接
            Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
            // 2.打开一个通道
            final Channel channel = rabbitMQConnections.createChannel();
            // 3.声明一个队列:我们在这里也声明了队列。因为我们可能在启动生产者之前启动消费者,所以我们希望在尝试使用来自该队列的消息之前确保该队列存在。
            channel.queueDeclare(WORK_QUEUE_NAME, true, false, false, null);
            // 4.服务器的消息是异步发送给我们的,所以需要提供一个回调。这个回调将缓冲消息,直到我们使用收到的消息
            channel.basicQos(1);
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery delivery) throws IOException {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println(new Date() + message);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
            // 消费消息,参数1:队列名称 参数2:true标识自动应答,如果设置为false,rabbitmq还会继续推送消息给消费者 参数三:标识回调
            channel.basicConsume(WORK_QUEUE_NAME, false, deliverCallback, consumerTag -> {
            });
        }
    }
    View Code

    消费者C2代码如下:

    public class WorkRabbitMQReceiver2 {
        // 声明一个队列的名字
        private static final String WORK_QUEUE_NAME = "durable_work_queue";
    
        /**
         * 消费者这一方的使用,和生产者一样,都是创建一个链接,打开一个通道。然后从队列中获取消息
         * DeliverCallback接口来缓冲由服务器推送给我们的消息。
         */
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建一个链接
            Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
            // 2.打开一个通道
            final Channel channel = rabbitMQConnections.createChannel();
            // 3.声明一个队列:我们在这里也声明了队列。因为我们可能在启动生产者之前启动消费者,所以我们希望在尝试使用来自该队列的消息之前确保该队列存在。
            channel.queueDeclare(WORK_QUEUE_NAME, true, false, false, null);
            channel.basicQos(5);
            // 4.服务器的消息是异步发送给我们的,所以需要提供一个回调。这个回调将缓冲消息,直到我们使用收到的消息
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery delivery) throws IOException {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println(new Date() + message);
                    try {
                        Thread.sleep(3);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
            // 消费消息,参数1:队列名称 参数2:true标识自动应答,如果设置为false,rabbitmq还会继续推送消息给消费者 参数三:标识回调
            channel.basicConsume(WORK_QUEUE_NAME, false, deliverCallback, consumerTag -> {
            });
        }
    }
    View Code

    模拟生产者发送完消息,rabbitmq死掉。然后rabbitmq重启。启动消费者,看能否接收到rabbitmq死掉前生产者发送的消息

     效果如下:

    可见,rabbitmq服务器死掉前, 发送消息的时间为7:31.而rabbitmq重启后,启动消费者,接收到 消息的时间是7:34.可见,rabbitmq死掉了,生产者发送的消息,并没有消失,而是被队列持久化了

  • 相关阅读:
    【POJ1961 Period】【KMP】
    浅谈KMP算法
    【关于动态开点线段树】
    【POJ3349 Snowflake Snow Snowflakes】【Hash表】
    【NOI 2002 银河英雄传说】【带权并查集】
    路径问题
    group_concat函数详解
    MySQL中GROUP_CONCAT中排序
    怎么实现CSS限制字数,超出部份显示点点点.
    jsp去掉小数点
  • 原文地址:https://www.cnblogs.com/yingxiaocao/p/13289637.html
Copyright © 2011-2022 走看看