zoukankan      html  css  js  c++  java
  • RabbitMQ:Confirm确认消息 Return返回消息

    1.Confirm消息确认机制

    消息的确认:是指生产者投递消息后,如果Broker收到消息,则会给生产者一个应答。

    生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障。

    生产端

        public static void main(String[] args) throws IOException, TimeoutException {
            //创建一个连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.10.132");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            //创建连接
            Connection connection = connectionFactory.newConnection();
            //通过连接创建一个Channel
            Channel channel = connection.createChannel();
            //指定消息的投递模式:消息的确认模式
            channel.confirmSelect();
    
            //通过Channel发送数据
            channel.basicPublish("","hello",null,"hello world".getBytes());
            //添加一个确认监听
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("----handleAck---");
                }
    
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("----handleNack---");
                }
            });
        }
    

    消费端:

        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //创建一个连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.10.132");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            //创建连接
            Connection connection = connectionFactory.newConnection();
            //通过连接创建一个Channel
            Channel channel = connection.createChannel();
            //创建一个队列
            String queueName = "hello";
            channel.queueDeclare(queueName,true,false,false,null);
            //创建一个消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //设置Channel
            channel.basicConsume(queueName,true,consumer);
            //获取消息
            while (true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println("消费端:"+msg);
            }
        }
    

     运行结果:

    消费端:

     生产端:

    2.Return返回消息机制

    某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这时候如果我们需要监听这种不可达的消息,就需要使用Return Listener

    在API中有个一重要配置项:

    Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,则broker端自动删除该消息。

    Return消息机制流程:

     消费端跟上文一样,

    生产端:

        public static void main(String[] args) throws IOException, TimeoutException {
            //创建一个连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.10.132");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            //创建连接
            Connection connection = connectionFactory.newConnection();
            //通过连接创建一个Channel
            Channel channel = connection.createChannel();
    
    
            channel.addReturnListener(new ReturnListener() {
                @Override
                public void handleReturn(int replyCode, String replyText, String exchange,
                                         String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(replyCode);
                    System.out.println(replyText);
                    System.out.println(exchange);
                    System.out.println(routingKey);
                    System.out.println(properties);
                    System.out.println(Arrays.toString(body));
                }
            });
    
            //通过Channel发送数据
            // 在这里要设置Mandatory(第三个参数)为true,否则broker会自动删除消息
            channel.basicPublish("","return",true,null,"hello world".getBytes());
        }  

     打印结果:

  • 相关阅读:
    选择排序与冒泡排序
    判断是否为偶数
    mysql基础之mysql双主(主主)架构
    mysql基础之mysql主从架构半同步复制
    mysql基础之mysql主从架构
    mysql基础之数据库备份和恢复实操
    mysql基础之数据库备份和恢复的基础知识
    mysql基础之日志管理(查询日志、慢查询日志、错误日志、二进制日志、中继日志、事务日志)
    mysql基础之查询缓存、存储引擎
    mysql基础之数据库变量(参数)管理
  • 原文地址:https://www.cnblogs.com/wwjj4811/p/12986683.html
Copyright © 2011-2022 走看看