一:介绍
1.异步模式介绍
Channel对象提供ConfirmListener()回调方法只包含deliverTag(当前Channel发出的序列号),我们需要自己为每一个Channel维护一个unconfirm的消息序列集合,没publish一条数据,集合就加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或者多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。
二:程序
1.生产者
1 package com.mq.AsynConfirm; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.ConfirmListener; 6 import com.rabbitmq.client.Connection; 7 8 import java.io.IOException; 9 import java.util.Collections; 10 import java.util.SortedSet; 11 import java.util.TreeSet; 12 13 public class Send { 14 private static final String QUEUE_NAME="test_queue_confirm_asyn"; 15 public static void main(String[] args)throws Exception{ 16 Connection connection= ConnectionUtil.getConnection(); 17 Channel channel=connection.createChannel(); 18 channel.queueDeclare(QUEUE_NAME,false,false,false,null); 19 //生产者调用confirmSelect将channel设置为nconfirm模式 20 channel.confirmSelect(); 21 final SortedSet<Long> confirmSet= Collections.synchronizedSortedSet(new TreeSet<Long>()); 22 channel.addConfirmListener(new ConfirmListener() { 23 //没有问题 24 public void handleAck(long deliveryTag, boolean multiple) throws IOException { 25 if (multiple){ 26 System.out.println("handleAck multiple"); 27 confirmSet.headSet(deliveryTag+1).clear(); 28 }else{ 29 System.out.println("handleAck false"); 30 confirmSet.remove(deliveryTag); 31 } 32 } 33 //有问题 34 public void handleNack(long deliveryTag, boolean multiple) throws IOException { 35 if (multiple){ 36 System.out.println("handleNack multiple"); 37 confirmSet.headSet(deliveryTag+1).clear(); 38 }else{ 39 System.out.println("handleNack false"); 40 confirmSet.remove(deliveryTag); 41 } 42 } 43 }); 44 String msg="success"; 45 while (true){ 46 long seqNo=channel.getNextPublishSeqNo(); 47 channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); 48 confirmSet.add(seqNo); 49 } 50 51 } 52 }
2.消费者
1 package com.mq.AsynConfirm; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 public class Receive { 9 private static final String QUEUE_NAME="test_queue_confirm_asyn"; 10 public static void main(String[] args)throws Exception { 11 Connection connection = ConnectionUtil.getConnection(); 12 Channel channel = connection.createChannel(); 13 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 14 channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){ 15 @Override 16 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 17 System.out.println(new String(body,"utf-8")); 18 } 19 }); 20 } 21 }
3.现象
Send: