zoukankan      html  css  js  c++  java
  • rabbitmq4-工作队列及公平分发模式

    建议大家如果没有看前一篇文章的时候,还是看一看第一篇文章,因为上篇文章的确把很多的概念都讲解的比较清楚。我发现有很多东西在单独使用rabbitmq是做不了的,例如自定义message投递的id,所以我希望快速的把这几篇介绍的博文写完,然后进入springboot的整合篇,但是我不建议新手一上来就开始使用springboot的整合,就想我在群里面听到的,不知道channel为何物更别提其他的概念了,只有一个稳扎稳打的基础在往高级的地方学习的时候才不费力。

    一、简单工作队列

    image.png
    我想大概这种模式的应用场景也就剩下了应用层面的解耦了吧,话不多话,下面直接用代码展示

    二、生产者代码:

    public class Producer {
    
        public static final String QUEUE_NAME = "work_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException{
    
            final Connection conn = ConnUtils.getConn();
            final Channel channel = conn.createChannel();
            boolean durable = true;
            boolean exclusive = false;
            boolean autoDelete = false;
            channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, null);
            channel.confirmSelect();
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    // 这个目前在单独使用rabbitmq的时候没有办法找到自定义这个消息标识的办法,但是在和springboot整合之后会提供这样的方法
                    System.out.println(multiple);
                    System.out.println("wtf 需要这么热吗:::::"+deliveryTag);
                }
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("啊哈哈,你被拒绝了……");
                }
            });
      // 这个地方也可以搞一个线程来进行发送
            channel.basicPublish("",QUEUE_NAME,null,"fuck 真他妈的热 ".getBytes());
            channel.basicPublish("",QUEUE_NAME,null,"fuck 真他妈的热 +1".getBytes());
            channel.basicPublish("",QUEUE_NAME,null,"fuck 真他妈的热 +2".getBytes());
            channel.basicPublish("",QUEUE_NAME,null,"fuck 真他妈的热 +3".getBytes());
            channel.basicPublish("",QUEUE_NAME,null,"fuck 真他妈的热 +4".getBytes());
            channel.basicPublish("",QUEUE_NAME,null,"fuck 真他妈的热 +5".getBytes());
            channel.close();
            conn.close();
        }
    }

    三、两个消费者(只需要把代码拷贝一份就可以了)

    public class Consumer01 {
        public static final String QUEUE_NAME = "work_queue";
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection conn = ConnUtils.getConn();
            final Channel channel = conn.createChannel();
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    long deliveryTag = envelope.getDeliveryTag();
                    System.out.println("Recv001"+"message == "+new String(body,"utf-8"));
                    channel.basicAck(deliveryTag,false);
                }
            };
            channel.basicConsume(QUEUE_NAME,false,consumer);
        }
    }

    先启动两个消费者,因为消息太少,如果先启动生产者,在启动消费者,一个消费者立马就消费完了。

    四、结果分析

    image.png
    image.png
    我们发现两个消费者总是已奇偶的形式出现的,加入两个消费者的消费能力不一样,消费者1消费能力比较高,但是以这种模式的话,那么整个系统的消费能力的上线就有比较弱的消费者2来决定了。所以下面介绍一种公平分发模式:公平指的是能者多劳

    我们在channel申明的下面加一行代码:我们分别设置consumer1的消费能力为3,consumer2的消费者能力为1

     /**
      * prefetchCount:告诉MQ不要同时给一个消费者推送超过prefetchCount个消息,
       * 即一点prefetchCount个消息没有应答,该消费者就会发生阻塞
       * global:指的是该设置是针对该consumer还是针对channel级别
       */
    channel.basicQos(3false);

    下面我们在观察结果:
    image.png
    image.png
    我们可以看到奇偶的模式不见了,而且消费者1的吞吐量是大于消费者2的

    本节到这里就结束了,有很多的介绍希望大家多去看看前面的文章。

  • 相关阅读:
    体温登记APP总结
    体温登记day4
    体温登记day3
    寒期周总结五
    体温登记day2
    体温登记day1
    家庭记账本day7
    家庭记账本day6
    家庭记账本day5
    家庭记账本day4
  • 原文地址:https://www.cnblogs.com/fkxuexi/p/9588373.html
Copyright © 2011-2022 走看看