zoukankan      html  css  js  c++  java
  • work工作消息队列Round-robin与Fair dispatch

    一:介绍

    1.模型

      有两种情形,分别是轮训分发与公平分发。

      

    2.出现的场景

      考虑到simple queue中的缺点。

      因为生产者发送消息后,消费者消费要花费时间,这个会造成消息的堆积。

    二:Round robin--轮循

    1.发送程序

      这个与简单程序类似,只是发送多条数据而已。

     1 package com.mq.work.round;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.Connection;
     6 
     7 public class RoundWorkSend {
     8     private static final String QUENE_NAME="test_work_queue";
     9     public static void main(String[] args) throws Exception {
    10         //获取一个连接
    11         Connection connection= ConnectionUtil.getConnection();
    12         //从连接中获取一个通道
    13         Channel channel=connection.createChannel();
    14         //创建队列声明
    15         channel.queueDeclare(QUENE_NAME,false,false,false,null);
    16 
    17         //消息与发送放入for循环
    18         for (int i=0;i<50;i++){
    19             String msg="hello "+i;
    20             System.out.println("[send msg]:"+msg);
    21             channel.basicPublish("",QUENE_NAME,null,msg.getBytes());
    22             Thread.sleep(i*1);
    23         }
    24 
    25         //关闭连接
    26         channel.close();
    27         connection.close();
    28     }
    29 }

    2.消费者一

     1 package com.mq.work.round;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.*;
     5 
     6 import java.io.IOException;
     7 
     8 public class RoundWorkReceive1 {
     9     private static final String QUENE_NAME="test_work_queue";
    10     public static void main(String[] args)throws Exception{
    11         //获取一个连接
    12         Connection connection = ConnectionUtil.getConnection();
    13         //创建通道
    14         Channel channel = connection.createChannel();
    15         //创建队列声明
    16         channel.queueDeclare(QUENE_NAME,false,false,false,null);
    17         //创建消费者
    18         DefaultConsumer consumer=new DefaultConsumer(channel){
    19             @Override
    20             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    21                 String msg=new String(body,"utf-8");
    22                 System.out.println("[1]receive msg:"+msg);
    23                 try {
    24                     Thread.sleep(200);
    25                 } catch (InterruptedException e) {
    26                     e.printStackTrace();
    27                 }finally {
    28                     System.out.println("done");
    29                 }
    30             }
    31         };
    32         //监听队列
    33         boolean autoAck=true;
    34         channel.basicConsume(QUENE_NAME,autoAck,consumer);
    35     }
    36 }

    3.消费者二

     1 package com.mq.work.round;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.*;
     5 
     6 import java.io.IOException;
     7 
     8 public class RoundWorkReceive2 {
     9     private static final String QUENE_NAME="test_work_queue";
    10     public static void main(String[] args)throws Exception{
    11         //获取一个连接
    12         Connection connection = ConnectionUtil.getConnection();
    13         //创建通道
    14         Channel channel = connection.createChannel();
    15         //创建队列声明
    16         channel.queueDeclare(QUENE_NAME,false,false,false,null);
    17         //创建消费者
    18         DefaultConsumer consumer=new DefaultConsumer(channel){
    19             @Override
    20             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    21                 String msg=new String(body,"utf-8");
    22                 System.out.println("[2]receive msg:"+msg);
    23                 try {
    24                     Thread.sleep(300);
    25                 } catch (InterruptedException e) {
    26                     e.printStackTrace();
    27                 }finally {
    28                     System.out.println("done");
    29                 }
    30             }
    31         };
    32         //监听队列
    33         boolean autoAck=true;
    34         channel.basicConsume(QUENE_NAME,autoAck,consumer);
    35     }
    36 }

    4.现象

      send

      

      receive1:

      

      receive2:

      

    三:fair dispatcher

    1.介绍

      使用公平分发需要关闭自动应答,改成手动。

      有一种通俗的说法是:能者多劳。 

    2.生产者

      需要改动的地方是:每个消费者在得到确认消息之前,消息队列不得发送一个消息给消费者,一次只能处理一个消息。

     1 package com.mq.work.fair;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.Connection;
     6 
     7 public class FairWorkSend {
     8     private static final String QUENE_NAME="test_work_queue";
     9     public static void main(String[] args) throws Exception {
    10         //获取一个连接
    11         Connection connection= ConnectionUtil.getConnection();
    12         //从连接中获取一个通道
    13         Channel channel=connection.createChannel();
    14         //创建队列声明
    15         channel.queueDeclare(QUENE_NAME,false,false,false,null);
    16 
    17         //限制发送给一个消费者不得超过1条
    18         int prefetchCount=1;
    19         channel.basicQos(prefetchCount);
    20 
    21         //消息与发送放入for循环
    22         for (int i=0;i<50;i++){
    23             String msg="hello "+i;
    24             System.out.println("[send msg]:"+msg);
    25             channel.basicPublish("",QUENE_NAME,null,msg.getBytes());
    26             Thread.sleep(i*1);
    27         }
    28 
    29         //关闭连接
    30         channel.close();
    31         connection.close();
    32     }
    33 }

    3.消费者一

      需要改动的行数,14,18,33,38

     1 package com.mq.work.fair;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.*;
     5 
     6 import java.io.IOException;
     7 
     8 public class FairWorkReceive1 {
     9     private static final String QUENE_NAME="test_work_queue";
    10     public static void main(String[] args)throws Exception{
    11         //获取一个连接
    12         Connection connection = ConnectionUtil.getConnection();
    13         //创建通道
    14         final Channel channel = connection.createChannel();
    15         //创建队列声明
    16         channel.queueDeclare(QUENE_NAME,false,false,false,null);
    17 
    18         //一次只能发送一个消息
    19         channel.basicQos(1);
    20 
    21         //创建消费者
    22         DefaultConsumer consumer=new DefaultConsumer(channel){
    23             @Override
    24             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    25                 String msg=new String(body,"utf-8");
    26                 System.out.println("[1]receive msg:"+msg);
    27                 try {
    28                     Thread.sleep(200);
    29                 } catch (InterruptedException e) {
    30                     e.printStackTrace();
    31                 }finally {
    32                     System.out.println("done");
    33                     //手动应答
    34                     channel.basicAck(envelope.getDeliveryTag(),false);
    35                 }
    36             }
    37         };
    38         //监听队列,不是自动应答
    39         boolean autoAck=false;
    40         channel.basicConsume(QUENE_NAME,autoAck,consumer);
    41     }
    42 }

    3.消费者二

      与消费者一不同点在于消费每个消息的时间不同。

     1 package com.mq.work.fair;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.*;
     5 
     6 import java.io.IOException;
     7 
     8 public class FairWorkReceive2 {
     9     private static final String QUENE_NAME="test_work_queue";
    10     public static void main(String[] args)throws Exception{
    11         //获取一个连接
    12         Connection connection = ConnectionUtil.getConnection();
    13         //创建通道
    14         final Channel channel = connection.createChannel();
    15         //创建队列声明
    16         channel.queueDeclare(QUENE_NAME,false,false,false,null);
    17 
    18         //一次只能发送一个消息
    19         channel.basicQos(1);
    20 
    21         //创建消费者
    22         DefaultConsumer consumer=new DefaultConsumer(channel){
    23             @Override
    24             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    25                 String msg=new String(body,"utf-8");
    26                 System.out.println("[1]receive msg:"+msg);
    27                 try {
    28                     Thread.sleep(500);
    29                 } catch (InterruptedException e) {
    30                     e.printStackTrace();
    31                 }finally {
    32                     System.out.println("done");
    33                     //手动应答
    34                     channel.basicAck(envelope.getDeliveryTag(),false);
    35                 }
    36             }
    37         };
    38         //监听队列,不是自动应答
    39         boolean autoAck=false;
    40         channel.basicConsume(QUENE_NAME,autoAck,consumer);
    41     }
    42 }

    4.现象

      消费者一:

      

      消费者二:

       

  • 相关阅读:
    接口测试基础operation
    关于Fiddler常见问题之一
    接口测试用例编写规则
    Codeforces 959E. Mahmoud and Ehab and the xor-MST 思路:找规律题,时间复杂度O(log(n))
    Codeforces 930A. Peculiar apple-tree (dfs)
    51nod 2020 排序相减(暴力解法)
    《汇编语言(第三版)》pushf 和 popf 指令,以及标志寄存器在 Debug 中的表示
    DF标志和串传送指令
    《汇编语言(第三版)》cmp指令
    《汇编语言(第三版)》标志寄存器
  • 原文地址:https://www.cnblogs.com/juncaoit/p/8598322.html
Copyright © 2011-2022 走看看