zoukankan      html  css  js  c++  java
  • 【RabbitMQ】02 工作队列模式

    首先编写一个工作队列的生产者:

    发送10条消息然后就关闭,10条消息让RabbitMQ先存着

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.nio.charset.StandardCharsets;
    
    public class WorkQueueInProducer {
    
        /**
         * 工作队列
         * @param args
         */
        public static void main(String[] args) throws Exception {
            ConnectionFactory connectionFactory = new ConnectionFactory();
    
            connectionFactory.setHost("192.168.2.121");
            connectionFactory.setPort(ConnectionFactory.DEFAULT_AMQP_PORT); // 5672
            connectionFactory.setVirtualHost("/dzz"); // 虚拟主机? 默认值 /
            connectionFactory.setUsername("test"); // guest
            connectionFactory.setPassword("123456"); // guest
    
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare("work_queue", true, false, false, null);
    
            for (int i = 0; i < 10; i++) {
                String body = "send workQueue msg" + i;
                channel.basicPublish("", "work_queue", null, body.getBytes(StandardCharsets.UTF_8));
            }
    
            channel.close();
            connection.close();
        }
    }

     然后创建两个消费者:

    两个消费者的代码是一样的,就是接收消息打印即可

    但是不要关闭

    package cn.dzz.workQueue;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    
    public class WorkQueueInConsumer2 {
    
        /**
         * 工作队列 消费者
         * @param args
         */
        public static void main(String[] args) throws Exception{
            ConnectionFactory connectionFactory = new ConnectionFactory();
    
            connectionFactory.setHost("192.168.2.121");
            connectionFactory.setPort(ConnectionFactory.DEFAULT_AMQP_PORT); // 5672
            connectionFactory.setVirtualHost("/dzz"); // 虚拟主机? 默认值 /
            connectionFactory.setUsername("test"); // guest
            connectionFactory.setPassword("123456"); // guest
    
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare("work_queue", true, false, false, null);
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("body(message) " + new String(body, StandardCharsets.UTF_8));
                    System.out.println("- - - - - over - - - - -");
                }
            };
    
            channel.basicConsume("work_queue", true, consumer);
        }
    }

    先打开两个消费者:

     

    然后再执行生产者发送消息:

     

    RabbitMQ立即把生产者的消息分配过来给两个消费者:

    消费者1输出

    "C:Program Files (x86)Javajdk1.8.0_291injava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2021.2.1libidea_rt.jar=50684:C:Program FilesJetBrainsIntelliJ IDEA 2021.2.1in" -Dfile.encoding=UTF-8 -classpath "C:Program Files (x86)Javajdk1.8.0_291jrelibcharsets.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibdeploy.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextaccess-bridge-32.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextcldrdata.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextdnsns.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextjaccess.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextjfxrt.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextlocaledata.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibext
    ashorn.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunec.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunjce_provider.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunmscapi.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunpkcs11.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextzipfs.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjavaws.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjce.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjfr.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjfxswt.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjsse.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibmanagement-agent.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibplugin.jar;C:Program Files (x86)Javajdk1.8.0_291jrelib
    esources.jar;C:Program Files (x86)Javajdk1.8.0_291jrelib
    t.jar;C:UsersAdministratorIdeaProjectsRabbitMQConsumerService	argetclasses;C:UsersAdministrator.m2
    epositorycom
    abbitmqamqp-client5.6.0amqp-client-5.6.0.jar;C:UsersAdministrator.m2
    epositoryorgslf4jslf4j-api1.7.25slf4j-api-1.7.25.jar" cn.dzz.workQueue.WorkQueueInConsumer1
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    body(message) send workQueue msg0
    - - - - - over - - - - -
    body(message) send workQueue msg2
    - - - - - over - - - - -
    body(message) send workQueue msg4
    - - - - - over - - - - -
    body(message) send workQueue msg6
    - - - - - over - - - - -
    body(message) send workQueue msg8
    - - - - - over - - - - -

    消费者2输出:

    "C:Program Files (x86)Javajdk1.8.0_291injava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2021.2.1libidea_rt.jar=50693:C:Program FilesJetBrainsIntelliJ IDEA 2021.2.1in" -Dfile.encoding=UTF-8 -classpath "C:Program Files (x86)Javajdk1.8.0_291jrelibcharsets.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibdeploy.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextaccess-bridge-32.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextcldrdata.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextdnsns.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextjaccess.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextjfxrt.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextlocaledata.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibext
    ashorn.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunec.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunjce_provider.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunmscapi.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunpkcs11.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextzipfs.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjavaws.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjce.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjfr.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjfxswt.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjsse.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibmanagement-agent.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibplugin.jar;C:Program Files (x86)Javajdk1.8.0_291jrelib
    esources.jar;C:Program Files (x86)Javajdk1.8.0_291jrelib
    t.jar;C:UsersAdministratorIdeaProjectsRabbitMQConsumerService	argetclasses;C:UsersAdministrator.m2
    epositorycom
    abbitmqamqp-client5.6.0amqp-client-5.6.0.jar;C:UsersAdministrator.m2
    epositoryorgslf4jslf4j-api1.7.25slf4j-api-1.7.25.jar" cn.dzz.workQueue.WorkQueueInConsumer2
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    body(message) send workQueue msg1
    - - - - - over - - - - -
    body(message) send workQueue msg3
    - - - - - over - - - - -
    body(message) send workQueue msg5
    - - - - - over - - - - -
    body(message) send workQueue msg7
    - - - - - over - - - - -
    body(message) send workQueue msg9
    - - - - - over - - - - -

    主要的作用是为了分摊这个10个消息

    消费者之间是处于竞争关系,争夺消息的接收

    工作队列用于任务过重的场景,用来提高任务处理速度

  • 相关阅读:
    概率与数学期望初步
    $Luogu$ $P4316$ 绿豆蛙的归宿(附期望 $dp$ 的设计总结)
    $Luogu$ $P4427$ $[BJOI2018]$ 求和
    $SP3978$ $DISQUERY$ $-$ $Distance$ $Query$
    最近公共祖先模板(未完待续)
    $Luogu$ $P3052$ $[USACO12MAR]$ 摩天大楼里的奶牛 $Cows$ $in$ $a$ $Skyscraper$
    $Luogu$ $P2622$ 关灯问题 $mathrm{II}$
    [转载] $CF633F$ 题解
    [转载] $Luogu$ $P3933$ 题解
    2020高考回忆录(随便写写
  • 原文地址:https://www.cnblogs.com/mindzone/p/15371951.html
Copyright © 2011-2022 走看看