zoukankan      html  css  js  c++  java
  • 四、RabbitMq的工作模式( Work Queues)

    Work Queues

    工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

    轮训分发消息

    在这个案例中我们会启动两个工作线程,一个消息发送线程,我们来看看他们两个工作线程是如何工作的。

    抽取工具类

    public class RabbitMqUtils {
        //得到一个连接的 channel
        public static Channel getChannel() throws Exception {
            //创建一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.1.100");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setVirtualHost("my_vhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            return channel;
        }
    }
    

    启动生产者

    发送10条消息

    public class Task01 {
        private static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) throws Exception {
            try (Channel channel = RabbitMqUtils.getChannel();) {
                // 让消息持久化
                boolean durable = false;
                channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
                //从控制台当中接受信息
                for (int i = 0; i < 10; i++) {
                    String message = "生产者发送消息" + i;
                    System.out.println(message);
                    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                }
            }
        }
    }
    

    启动两个工作线程(消费者)

    worker01 和 worker02

    public class Worker01 {
        private static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String receivedMessage = new String(delivery.getBody());
                System.out.println("C1接收到消息:" + receivedMessage);
            };
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println(consumerTag + "C1消费者取消消费接口回调逻辑");
            };
            System.out.println("C1 消费者启动等待消费......");
    
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
        }
    }
    
    public class Worker02 {
        private static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String receivedMessage = new String(delivery.getBody());
                System.out.println("C2接收到消息:" + receivedMessage);
            };
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println(consumerTag + "C2消费者取消消费接口回调逻辑");
            };
            System.out.println("C2 消费者启动等待消费......");
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
        }
    }
    

    结果展示

    生产者发送10条数据,分别被两个消费者轮训消费

    task01

    worker01worker02

    默认情况下,消息是轮训消费的,如果要work02的机器性能高,想多消费一些消息。可以修改basicQos

    int prefetchCount = 1;
    channel.basicQos(prefetchCount);
    
    basicQos根据情况设置:消费者C1设置为1,消费者C2设置为3
    注意:这时候,不能使用自动应答的方式,而是应改为手动应答的方式。否则还是轮询的接收方式。自动应答,是消息被发送出去之后,不管消费者是否消费成功,都被rabbitmq认为是已消费完成。然后就会发送下一条消息给消费者
    
  • 相关阅读:
    AOP概述
    函数调用规定
    lexical scoping vs. dynamic scoping
    [C++]C++11新特性
    JavaScript Tutorial 05 #Class#
    JavaScript Tutorial 04 #Function#
    JavaScript Tutorial 03 #Array#
    JavaScript Tutorial 02 #Object#
    JavaScript Tutorial 01 #Basic#
    Win32 Thread Information Block
  • 原文地址:https://www.cnblogs.com/linhp/p/15219445.html
Copyright © 2011-2022 走看看