zoukankan      html  css  js  c++  java
  • Work Queues(工作队列)

    1.模型

    2.创建生产者

    package com.dwz.rabbitmq.work;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.dwz.rabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    /**           |--c1
     * p---Queue--|
     *            |--c2
     */
    public class Send {
        private static final String QUEUE_NAME = "test_work_queue";
        
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            for(int i = 0; i < 50; i++) {
                String msg = "send:--" + i;
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            }
            
            channel.close();
            connection.close();
        }
    }

    3.创建消费者1

    package com.dwz.rabbitmq.work;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.dwz.rabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class rev01 {
        private static final String QUEUE_NAME = "test_work_queue";
        public static void main(String[] args) throws IOException, TimeoutException {
            //获取一个连接
            Connection connection = ConnectionUtils.getConnection();
            //从连接中获取一个通道
            Channel channel = connection.createChannel();
            //队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //定义消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                //自动接收消息
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("rev01:" + msg);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            //监听队列
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    }

    4.创建消费者2

    package com.dwz.rabbitmq.work;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.dwz.rabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class rev02 {
        private static final String QUEUE_NAME = "test_work_queue";
        public static void main(String[] args) throws IOException, TimeoutException {
            //获取一个连接
            Connection connection = ConnectionUtils.getConnection();
            //从连接中获取一个通道
            Channel channel = connection.createChannel();
            //队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //定义消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                //自动接收消息
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("rev02:" + msg);
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            //监听队列
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    }

    5.运行代码

    预期结果:

    按照延迟加载时间获取消息数量不同,数量比例为 延时1:延时2

    测试结果如下:

    1.两个消费者先启动完成,再启动生产者,这时会采用轮询分发的方式,消费者1和消费者2各拿到一半的消息

    2.生产者先启动完成,消费者按照先后顺序启动,会发现所有消息都被先启动的那个消费者接收

    达到预期结果的解决方案:

    消费者限流+手动签收确认

    消费者限流:channel.basicQos(1);

    手动签收确认:channel.basicAck(envelope.getDeliveryTag(), false);

  • 相关阅读:
    Webbrowser 取消下载提示框
    The service ‘xxx’ configured for WCF is not registered with the Autofac container
    Code First 中的 TPH TPT TPC
    SQL Server 之 解锁
    导入 github 步骤
    初试 pyhton 简易采集
    js 一些小技巧
    linux 学习笔记
    lnmp 环境搭建后,pathinfo 模式支持的配制。
    windows 快捷键相关命令
  • 原文地址:https://www.cnblogs.com/zheaven/p/11799675.html
Copyright © 2011-2022 走看看