zoukankan      html  css  js  c++  java
  • Native RabbitMQ ProducerComfirm

    发布者确认模式

    注意,这里需要开启确认模式:channel.confirmSelect()

    /**
     * 发布者确认回调
     * handleAck:消息经由交换器,路由键,成功到达了队列,会调用此方法
     * handleNack:消息经由交换器,路由键在到达队列的途中,MQ内部出现异常会调用此方法
     *
     * @author zhangjianbing
     * time 2020/9/7
     */
    @SuppressWarnings("Duplicates")
    public class ProducerConfirm {
    
        public static final String EXCHANGE_NAME = "confirm-exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("1.1.1.1");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("beijing");
            connectionFactory.setPassword("123456");
            connectionFactory.setVirtualHost("beijing");
            // 创建连接
            Connection connection = connectionFactory.newConnection();
            // 创建信道
            Channel channel = connection.createChannel();
            // 在信道中设置交换器
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
            channel.confirmSelect();// 开启发布者确认
            // 设置路由键
            String routeKey = "confirm";
            // 消息
            String message = "hello rabbit message queue";
    
            String queueName = "CONFIRM.CALLBACK.QUEUE";
            channel.queueDeclare(queueName, true, false, false, null);
    
            // 将队列和交换器通过路由键绑定在一起
            channel.queueBind(queueName, EXCHANGE_NAME, routeKey);
    
            channel.addConfirmListener(new ConfirmListener() {
    
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("【消息成功到达了队列】");
                }
    
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("【MQ内部出现异常】");
                }
    
            });
    
            // 发送消息
            channel.basicPublish(EXCHANGE_NAME, routeKey, false, null, message.getBytes());
    
            try {
                Thread.sleep(500);// 【这里需要将关闭连接的操作去掉,或者延迟,否则回调不到】
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 关闭信道和连接
            channel.close();
            connection.close();
        }
    
    }
    
  • 相关阅读:
    【Swift】WKWebView与JS的交互使用
    【React Native】进阶指南之二(手势响应系统)
    【React Native】进阶指南之一(特定平台、图片加载、动画使用)
    React Native适配IPhoneX系列设备之<SafeAreaView />
    【React Native】react-native之集成支付宝支付、微信支付
    【React Natvie】React-native-swiper的安装和配置【ES6】
    React Native之React Navigation踩坑
    遭遇裁员,如何渡过心理危机?
    Spring 核心技术与产品理念剖析【下】
    Spring 核心技术与产品理念剖析【上】
  • 原文地址:https://www.cnblogs.com/zhangjianbing/p/13629334.html
Copyright © 2011-2022 走看看