zoukankan      html  css  js  c++  java
  • Native RabbitMQ ProducerComfirm

    发布者确认模式

    注意,这里需要开启确认模式:channel.confirmSelect()

    /**
     * 发布者确认回调
     * handleAck:消息经由交换器,路由键,成功到达了队列,会调用此方法
     * handleNack:消息经由交换器,路由键在到达队列的途中,MQ内部出现异常会调用此方法
     *
     * @author zhangjianbing
     * time 2020/9/7
     */
    @SuppressWarnings("Duplicates")
    public class ProducerConfirm {
    
        public static final String EXCHANGE_NAME = "confirm-exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("1.1.1.1");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("beijing");
            connectionFactory.setPassword("123456");
            connectionFactory.setVirtualHost("beijing");
            // 创建连接
            Connection connection = connectionFactory.newConnection();
            // 创建信道
            Channel channel = connection.createChannel();
            // 在信道中设置交换器
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
            channel.confirmSelect();// 开启发布者确认
            // 设置路由键
            String routeKey = "confirm";
            // 消息
            String message = "hello rabbit message queue";
    
            String queueName = "CONFIRM.CALLBACK.QUEUE";
            channel.queueDeclare(queueName, true, false, false, null);
    
            // 将队列和交换器通过路由键绑定在一起
            channel.queueBind(queueName, EXCHANGE_NAME, routeKey);
    
            channel.addConfirmListener(new ConfirmListener() {
    
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("【消息成功到达了队列】");
                }
    
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("【MQ内部出现异常】");
                }
    
            });
    
            // 发送消息
            channel.basicPublish(EXCHANGE_NAME, routeKey, false, null, message.getBytes());
    
            try {
                Thread.sleep(500);// 【这里需要将关闭连接的操作去掉,或者延迟,否则回调不到】
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 关闭信道和连接
            channel.close();
            connection.close();
        }
    
    }
    
  • 相关阅读:
    [redis读书笔记] 第二部分 sentinel
    [redis读书笔记] 第三部分 多机数据库的实现 复制
    单线程的REDIS为什么这么快?
    [redis读书笔记] 第二部分 单机数据库 RDB持久化
    [redis读书笔记] 第二部分 单机数据库 数据库实现
    选靓号——拼多多笔试题(贪心+暴力)
    种树——拼多多笔试题(暴搜+剪枝)
    【学习笔记】《Java编程思想》 第8~11章
    leetcode——二分
    CodeForces-1265E(期望)
  • 原文地址:https://www.cnblogs.com/zhangjianbing/p/13629334.html
Copyright © 2011-2022 走看看