zoukankan      html  css  js  c++  java
  • RabbitMQ 之 WorkQueues工作队列

    模型图

    为什么会出现 work queues?

    前提:使用 simple 队列的时候 (上一篇博客)
    我们应用程序在是使用消息系统的时候,一般生产者 P 生产消息是毫不费力的(发送消息即可),而消费者接收完消息
    后的需要处理,会耗费一定的时间,这时候,就有可能导致很多消息堆积在队列里面,一个消费者有可能不够用


    那么怎么让消费者同事处理多个消息呢?

    在同一个队列上创建多个消费者,让他们相互竞争,这样消费者就可以同时处理多条消息了

    使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)
    来解决这一问题,使得系统的伸缩性更加容易。

    Round-robin(轮询分发)

    生产者发送消息

     1 package cn.wh.work;
     2 
     3 import cn.wh.util.RabbitMqConnectionUtil;
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.Connection;
     6 
     7 import java.io.IOException;
     8 import java.util.concurrent.TimeoutException;
     9 
    10 public class Send {
    11     private static final String QUEVE_NAME = "test_work_queue";
    12 
    13     public static void main(String[] args) throws Exception {
    14 
    15         Connection connection = RabbitMqConnectionUtil.getConnection();
    16         Channel channel = connection.createChannel();
    17         channel.queueDeclare(QUEVE_NAME, false, false, false, null);
    18         for (int i = 0; i < 50; i++) {
    19             String msg = "hello " + i;
    20             System.out.println(msg);
    21             channel.basicPublish("", QUEVE_NAME, null, msg.getBytes());
    22             Thread.sleep(i * 20);
    23         }
    24         channel.close();
    25         connection.close();
    26     }
    27 }

    消费者 1

     1 package cn.wh.work;
     2 
     3 import cn.wh.util.RabbitMqConnectionUtil;
     4 import com.rabbitmq.client.*;
     5 
     6 import java.io.IOException;
     7 
     8 public class Recv1 {
     9     private static final String QUEVE_NAME = "test_work_queue";
    10     public static void main(String[] args) throws Exception {
    11         Connection connection = RabbitMqConnectionUtil.getConnection();
    12         Channel channel = connection.createChannel();
    13         DefaultConsumer consumer = new DefaultConsumer(channel) {
    14             @Override
    15             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    16                 String msg = new String(body);
    17                 System.out.println("recv1"+msg);
    18                 try {
    19                     Thread.sleep(2000);
    20                 } catch (InterruptedException e) {
    21                     e.printStackTrace();
    22                 }finally {
    23                     System.out.println(1+"OK");
    24                 }
    25             }
    26         };
    27         channel.basicConsume(QUEVE_NAME,true,consumer);
    28     }
    29 }

    消费者 2

     1 package cn.wh.work;
     2 
     3 import cn.wh.util.RabbitMqConnectionUtil;
     4 import com.rabbitmq.client.*;
     5 
     6 import java.io.IOException;
     7 
     8 public class Recv2 {
     9     private static final String QUEVE_NAME = "test_work_queue";
    10     public static void main(String[] args) throws Exception {
    11         Connection connection = RabbitMqConnectionUtil.getConnection();
    12         Channel channel = connection.createChannel();
    13         DefaultConsumer consumer = new DefaultConsumer(channel) {
    14             @Override
    15             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    16                 String msg = new String(body);
    17                 System.out.println("recv2"+msg);
    18                 try {
    19                     Thread.sleep(1000);
    20                 } catch (InterruptedException e) {
    21                     e.printStackTrace();
    22                 }finally {
    23                     System.out.println(2+"OK");
    24                 }
    25             }
    26         };
    27         channel.basicConsume(QUEVE_NAME,true,consumer);
    28     }
    29 }

    测试

    备注:消费者 1 我们处理时间是 ;而消费者 2 中处理时间是 2s;但是我们看到的现象并不是 1 处理的多 消费者 2 处理的

    消费者 1 中将偶数部分处理掉了 

    消费者2中将基数部分处理掉了

    1.消费者 1 和消费者 2 获取到的消息内容是不同的,同一个消息只能被一个消费者获取
    2.消费者 1 和消费者 2 货到的消息数量是一样的 一个奇数一个偶数
    按道理消费者 1 获取的比消费者 2 要多

    这种方式叫做轮询分发 结果就是不管谁忙或清闲,都不会给谁多一个任务或少一个任务,任务总是你一个我一个
    的分

    如果想要代码可以留言 我可以私发

  • 相关阅读:
    spring aop实现过程之三Spring AOP中Aspect编织的实现
    spring aop实现过程之一代理对象的生成
    数据库常用面试题(SQL Server) (转载)
    回溯法解八后问题
    masmplus增加调试工具
    c++ new关键字 详解
    EMU8086 编译器使用简介
    汇编操作显存
    回溯法简介
    汇编链接时 错误:unresolved external symbol _WinMainCRTStartup
  • 原文地址:https://www.cnblogs.com/wh1520577322/p/10059628.html
Copyright © 2011-2022 走看看