zoukankan      html  css  js  c++  java
  • Rabbitmq(7) confirm 异步模式

    //存储未确认的消息标识tag
    final SortedSet<Long> confirmSet = Collections.synchronizedNavigableSet(new TreeSet<Long>());

    增加监听器
    channel.addConfirmListener(new ConfirmListener() {


    /*
    * 消息确认返回成功
    * l:如果是多条,这个就是最后一条消息的tag
    * b:是否多条
    * */
    @Override
    public void handleAck(long l, boolean b) throws IOException {
    System.out.println("消息发送成功"+l+"是否多条"+b);
    if(b){
    confirmSet.headSet(l+1).clear();
    }else{
    confirmSet.remove(l);
    }
    }

    /*消息确认返回失败*/
    @Override
    public void handleNack(long l, boolean b) throws IOException {
    System.out.println("消息发送失败"+l+"是否多条"+b);
    if(b){
    confirmSet.headSet(l+1).clear();
    }else{
    confirmSet.remove(l);
    }
    }
    });

    String routingKey ="goods.delete";
    for (int i = 0; i <10; i++) {
    String message = "hello ps"+i;
    long tag = channel.getNextPublishSeqNo();
    channel.basicPublish(Exchange_NAME,routingKey,null,message.getBytes("utf-8"));
    System.out.println(tag);
    confirmSet.add(tag);
    }
    发送者
    package com.aynu.bootamqp.service;
    
    import com.aynu.bootamqp.commons.utils.Amqp;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConfirmListener;
    import com.rabbitmq.client.Connection;
    
    
    import java.io.IOException;
    import java.util.Collections;
    import java.util.SortedSet;
    import java.util.TreeSet;
    import java.util.concurrent.TimeoutException;
    
    public class Send {
    
        private final static String Exchange_NAME ="hello";
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
                Connection connection = Amqp.getConnection();
                Channel channel = connection.createChannel();
                //声明交换机
                channel.exchangeDeclare(Exchange_NAME,"topic");
                //在手动确认机制之前
                //一次只发送一条消息,给不同的消费者
                channel.basicQos(1);
                //将通道设置为comfirm模式
                channel.confirmSelect();
    
            //存储未确认的消息标识tag
            final SortedSet<Long> confirmSet = Collections.synchronizedNavigableSet(new TreeSet<Long>());
    
            channel.addConfirmListener(new ConfirmListener() {
    
                /*
                 * 消息确认返回成功
                 * l:如果是多条,这个就是最后一条消息的tag
                 * b:是否多条
                 * */
                @Override
                public void handleAck(long l, boolean b) throws IOException {
                    System.out.println("消息发送成功"+l+"是否多条"+b);
                    if(b){
                        confirmSet.headSet(l+1).clear();
                    }else{
                        confirmSet.remove(l);
                    }
                }
    
                /*消息确认返回失败*/
                @Override
                public void handleNack(long l, boolean b) throws IOException {
                    System.out.println("消息发送失败"+l+"是否多条"+b);
                    if(b){
                        confirmSet.headSet(l+1).clear();
                    }else{
                        confirmSet.remove(l);
                    }
                }
            });
    
    
                String routingKey ="goods.delete";
                for (int i = 0; i <10; i++) {
                    String message = "hello ps"+i;
                    long tag = channel.getNextPublishSeqNo();
                    channel.basicPublish(Exchange_NAME,routingKey,null,message.getBytes("utf-8"));
                    System.out.println(tag);
                    confirmSet.add(tag);
                }
    
    
                channel.close();
                connection.close();
        }
    }

    接受者

    package com.aynu.bootamqp.service;
    
    import com.aynu.bootamqp.commons.utils.Amqp;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    @SuppressWarnings("all")
    public class Receive2 {
    
        private final static String QUEUE_NAME ="hello1";
        private final static String Exchange_NAME ="hello";
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = Amqp.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            channel.queueBind(QUEUE_NAME,Exchange_NAME,"goods.#");
            channel.basicQos(1);
            DefaultConsumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    String msg = new String(body,"utf-8");
                    System.out.println(msg);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        // 手动发送消息确认机制
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                }
            };
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME,autoAck,consumer);
        }
    }
     
  • 相关阅读:
    FSCapture 取色工具(绿色版 )
    Java EE.JavaBean
    Java EE.JSP.内置对象
    Java EE.JSP.动作组件
    Java EE.JSP.指令
    Java EE.JSP.脚本
    21、多态与多态性、内置方法、反射、异常处理
    20、继承的应用、super、组合
    19、property、绑定方法(classmethod、staticmethod)、继承
    18、类
  • 原文地址:https://www.cnblogs.com/mm163/p/10704705.html
Copyright © 2011-2022 走看看