zoukankan      html  css  js  c++  java
  • rabbitMq(2)之“Work”模式

    1.架构图

      

     模式简介

    • 一个消息生产者P,一个消息存储队列Q,多个消息消费者C
    • Work模型能够较好的解决资源密集型场景的问题,不需要像Hello World那样孤注一掷的等唯一的消费者消费完
    • 多个消费者,多管齐下,更加高效的并行处理消息

    2.实践应用

    2.1  生产者

    package com.rabbitmq.producer;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.utils.ConnectUtil;
    
    public class Producer_Work {
        private static String QUEUE_NAME = "test_queue_work";
        public static void main(String[] args) throws Exception {
            // TODO Auto-generated method stub
            //获得连接
            Connection con =  ConnectUtil.getConnection();
            //获得通道
            Channel channel = con.createChannel();
            //声明创建队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //同一时刻只给消费者发送一条
    //        channel.basicQos(1);
            //消息内容
            int i = 0;
            while(i<50){
                String message = "hello "+i;
                //发送消息队列
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                //发送成功,打印发送信息
                System.out.println("生产者发送消息是:"+message);
                i++;
                Thread.sleep(i*10);
            }
            //关闭通道和连接
            channel.close();
            con.close();
        }
    
    }

    2.2 消费者 

    消费者1

    package com.rabbitmq.consumer;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    import com.rabbitmq.utils.ConnectUtil;
    
    public class Work_Consumer1 {
        private static String QUEUE_NAME = "test_queue_work";
        public static void main(String[] argv) throws Exception{
            //获得连接
            Connection con =  ConnectUtil.getConnection();
            //获得通道
            Channel channel = con.createChannel();
            //声明创建队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //同一时刻只给消费者发送一条
            //channel.basicQos(1);
            //创建消息者
            QueueingConsumer consumer = new QueueingConsumer(channel); 
            //发送消息队列
            channel.basicConsume(QUEUE_NAME, true, consumer);
            while(true){
                Delivery delivery= consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("g 消费者接受的消息是:"+message);
                Thread.sleep(10);
                //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }

    消费者2

    package com.rabbitmq.consumer;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    import com.rabbitmq.utils.ConnectUtil;
    
    public class Work_Consumer2 {
        private static String QUEUE_NAME = "test_queue_work";
        public static void main(String[] argv) throws Exception{
            //获得连接
            Connection con =  ConnectUtil.getConnection();
            //获得通道
            Channel channel = con.createChannel();
            //声明创建队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //创建消息者
            //同一时刻只给消费者发送一条
            //channel.basicQos(1);
            QueueingConsumer consumer = new QueueingConsumer(channel); 
            //发送消息队列(true:自动确认,false:手动确认)
            channel.basicConsume(QUEUE_NAME, false, consumer);
            while(true){
                Delivery delivery= consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("g 消费者接受的消息是:"+message);
                Thread.sleep(1000);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }

    3.测试结果

      3.1测试结果:(默认是公平接受)

    1. 消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。
    2. 消费者1和消费者2获取到的消息的数量是相同的,一个是奇数一个是偶数。
    3. 其实,这样是不合理的,应该是消费者1(sleep时间短,消费消息能力高)要比消费者2(sleep时间长,消费消息能力低)获取到的消息多才对。

      其实,只需要设置在同一时刻只有一个接受者就可以达到目的了(非公平模式)

      channel.basicQos(1);

      在两个消费者中添加以上两个代码即可。

      3.2修改后测试结果:

      消费者1比消费者2获取的消息更多。

      3.3 消息的确认模式

      消费者从队列中获取消息,服务端如何知道消息已经被消费呢?

      模式1:自动确认 (消费者1)

      只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。

      模式2:手动确认(消费者2)

      消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。

    4.总结

    • 启动两个消费者,他们消费同一个队列就可以实现Worker模式了,也就是多个消费者同时消费一个消息队列中的消息。这样就提高了消息队列的使用效率。 
    • 不再是一个消费者,需要注意公平和非公平模式的区分。
    • 其实还有一点需要注意,那就消费确认,如果不是手动设置了手动确认,而消费者使用后没有确认,队列即使没有收到确认反馈也就不可消费了。
  • 相关阅读:
    勿忘心安
    设△ABC的内角A,B,C,所对的边分别为a,b,c,且acosB-bcosA=3/5c,则tan(A-B)的最大值为
    P1011 车站
    第一天
    P1134 阶乘问题
    P3152 正整数序列
    某数论
    DO YOU WANNA BUILD A SNOW MAN ?
    luogu P1579 哥德巴赫猜想(升级版)
    紫书 习题 10-25 UVa 1575 (有重复元素的全排列+暴搜)
  • 原文地址:https://www.cnblogs.com/fengyan20150508/p/8834994.html
Copyright © 2011-2022 走看看