zoukankan      html  css  js  c++  java
  • RabbitMQ confirm的确认监听模式

    添加确认监听需要开启确认监听模式 实现 addConfirmListener方法confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送
    下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以
    在回调方法中处理该nack消息;

    消费者:
    package com.flying.rabbitmq.api.confirm;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    
    public class Consumer {
    
        
        public static void main(String[] args) throws Exception {
            
            
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            //2 获取Connection
            Connection connection = connectionFactory.newConnection();
            
            //3 通过Connection创建一个新的Channel
            Channel channel = connection.createChannel();
            
            String exchangeName = "test_confirm_exchange";
            String routingKey = "confirm.#";
            String queueName = "test_confirm_queue";
            
            //4 声明交换机和队列 然后进行绑定设置, 最后制定路由Key
            channel.exchangeDeclare(exchangeName, "topic", true);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //5 创建消费者 
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            channel.basicConsume(queueName, true, queueingConsumer);
            
            while(true){
                Delivery delivery = queueingConsumer.nextDelivery();
                String msg = new String(delivery.getBody());
                
                System.err.println("消费端: " + msg);
            }
            
            
        }
    }

    生产者:

    package com.flying.rabbitmq.api.confirm;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConfirmListener;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    
    /**
     * 添加确认监听需要开启确认监听模式 实现 addConfirmListener方法
     * confirm模式最大的好处在于他是异步的,一旦发布一条消息,
     * 生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,
     * 当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,
     * 如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息;
     */
    public class Producer {
    
        
        public static void main(String[] args) throws Exception {
            
            
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            //2 获取Connection
            Connection connection = connectionFactory.newConnection();
            
            //3 通过Connection创建一个新的Channel
            Channel channel = connection.createChannel();
            
            
            //4 指定我们的消息投递模式: 消息的确认模式 
            channel.confirmSelect();
            String exchangeName = "test_confirm_exchange";
            String routingKey = "confirm.save";
            
            //5 发送一条消息
            String msg = "Hello RabbitMQ Send confirm message!";
            channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
    
            // 添加一个确认监听
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.err.println(deliveryTag);
                    System.err.println("-------no ack!-----------");
                }
    
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
    
                    System.err.println("-------ack!-----------");
                }
            });
            
            
            
            
            
        }
    }
  • 相关阅读:
    VMWare安装win10提示units specified don’t exist, SHSUCDX can’t install
    WinXP.Http.Post请求错误提示:基础连接已经关闭:发送时发生错误
    如何用PostMan请求WebApi
    无法解决 equal to 运算中 "Chinese_PRC_CI_AS" 和 "Chinese_PRC_CI_AS_WS" 之间的排序规则冲突 解决
    c# Winform PropertyGrid 实现下拉框 多选
    c# Winform GridControl 给列自动生成快捷操作按钮
    Tomcat启动报内存溢出错误:java.lang.OutOfMemoryError: PermGen space异常 解决
    Net Core 项目引用Exceptionless记录使用
    .Net 开源异常日志ExceptionLess搭建
    c# AutoMapper 扩展
  • 原文地址:https://www.cnblogs.com/lflying/p/11107340.html
Copyright © 2011-2022 走看看