zoukankan      html  css  js  c++  java
  • RabbitMQ_2、工作队列

    工作队列(竞争消费者模式)

    官方案例

    工作队列_消息发送

    /**
     * @PackageName : com.rzk
     * @FileName : Send
     * @Description : 工作队列-轮询-消息生产者
     * @Author : rzk
     * @CreateTime : 23/6/2021 上午12:21
     * @Version : 1.0.0
     */
    public class Send {
    
        //定义队列名称
        private final static String QUEUE_NAME = "work_rr";
    
        public static void main(String[] argv) throws Exception {
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("*");
            factory.setUsername("");
            factory.setVirtualHost("/");
            factory.setPassword("");
            factory.setPort(5672);
    
            try (
                    //连接工厂创建连接
                    Connection connection = factory.newConnection();
                    //创建信道
                    Channel channel = connection.createChannel()) {
                /**
                 * 绑定队列
                 * 声明队列
                 *  第一个参数 queue :队列名称
                 *  第二个参数 durable :是否持久化
                 *  第三个参数 Exclusive :排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明的连接可见,并在连接断开时自动删除。
                 * 这里需要注意三点:
                 *      1 .排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创建的排他他队
                 *      2 . ”百次“,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的;排他队列这个与昔通队歹怀同。
                 *      3 .即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个容户端发送读取消息的应用场景。
                 * 第四个参数 Auto-delete :自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列
                 */
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                for (int i = 0; i < 20; i++) {
                    String message = " Send  "+i;
                    //队列消息的生产者:发送消息
                    channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
                    System.out.println(" [x] Sent '" + message + "'" + i);
                }
            }
        }
    }
    
    

    工作队列_消息接收

    新建两个消息消费去接收

    1

    /**
     * @PackageName : com.rzk.simple.recv
     * @FileName : Recv
     * @Description : 工作队列-轮询-消息接收
     * @Author : rzk
     * @CreateTime : 23/6/2021 上午12:22
     * @Version : 1.0.0
     */
    public class Recv01 {
        private final static String QUEUE_NAME = "work_rr";
    
        public static void main(String[] argv) throws Exception {
            //创建工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("*");
            factory.setUsername("yeb");
            factory.setVirtualHost("/yeb");
            factory.setPassword("yeb");
            factory.setPort(5672);
            //连接工厂创建连接
            Connection connection = factory.newConnection();
            //创建信道
            Channel channel = connection.createChannel();
            //绑定队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                /**
                 * 手动确认
                 * multiple: 是否确认多条
                 */
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false );
            };
            /**
             * 监听队列消费消息
             * autoAck:自动应答
             * 当消费者收到该消息,会返回通知消息队列 我消费者已经收到消息了
             */
            channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
        }
    }
    

    2

    /**
     * @PackageName : com.rzk.simple.recv
     * @FileName : Recv
     * @Description : 工作队列-轮询-消息接收
     * @Author : rzk
     * @CreateTime : 23/6/2021 上午12:22
     * @Version : 1.0.0
     */
    public class Recv02 {
        private final static String QUEUE_NAME = "work_rr";
    
        public static void main(String[] argv) throws Exception {
            //创建工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("*");
            factory.setUsername("yeb");
            factory.setVirtualHost("/yeb");
            factory.setPassword("yeb");
            factory.setPort(5672);
            //连接工厂创建连接
            Connection connection = factory.newConnection();
            //创建信道
            Channel channel = connection.createChannel();
            //绑定队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                //模拟消费耗时
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                /**
                 * 手动确认
                 * multiple: 是否确认多条或单条数据
                 */
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false );
            };
            //监听队列消费消息
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }
    }
    
    

    监听队列消费消息
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    在 Boolean autoAck = false的情况下,如果消费者1宕机了,消息队列没有收到消费者发送回的应答,就会将这个消息发送给下一个消费者处理。直到消费者处理完这个消息,并向消息队列发送了一个消息应答,告诉消息队列此时这个消息已经处理完成,消息队列才会将这个消息从内存中删除。

    工作队列的优点:解决简单队列 当生产者生产消息大与消费者消费能力时,加多几个消费者,让消费者的消费能力大于等于生产者生产能力,这样就能减少多余的消息堆积在队列里面

    公平模式

    需要把这个代码放在接收消息类里面

            //限制消费者每次只能接受一条,处理完才能接受下一条消息
            channel.basicQos(1);
    
  • 相关阅读:
    jenkins构建完成后,执行的命令行的东西也会自动结束的解决办法
    解决ansible上传速度慢的问题
    uniq的坑坑
    tomcat问题
    R语言入门:对于boxplot()箱型图的直观理解
    R语言入门:条形图barplot()的绘制
    R语言入门:数据框的创建和访问
    Python当中的命令行参数sys.argv[]的使用方法
    R语言清除单个变量和全部变量
    linux下添加环境变量
  • 原文地址:https://www.cnblogs.com/rzkwz/p/14925181.html
Copyright © 2011-2022 走看看