zoukankan      html  css  js  c++  java
  • 消息确认机制,confirm异步

    一:介绍

    1.异步模式介绍

      Channel对象提供ConfirmListener()回调方法只包含deliverTag(当前Channel发出的序列号),我们需要自己为每一个Channel维护一个unconfirm的消息序列集合,没publish一条数据,集合就加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或者多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。

    二:程序

    1.生产者

     1 package com.mq.AsynConfirm;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.ConfirmListener;
     6 import com.rabbitmq.client.Connection;
     7 
     8 import java.io.IOException;
     9 import java.util.Collections;
    10 import java.util.SortedSet;
    11 import java.util.TreeSet;
    12 
    13 public class Send {
    14     private static final String QUEUE_NAME="test_queue_confirm_asyn";
    15     public static void main(String[] args)throws Exception{
    16         Connection connection= ConnectionUtil.getConnection();
    17         Channel channel=connection.createChannel();
    18         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    19         //生产者调用confirmSelect将channel设置为nconfirm模式
    20         channel.confirmSelect();
    21         final SortedSet<Long> confirmSet= Collections.synchronizedSortedSet(new TreeSet<Long>());
    22         channel.addConfirmListener(new ConfirmListener() {
    23             //没有问题
    24             public void handleAck(long deliveryTag, boolean multiple) throws IOException {
    25                 if (multiple){
    26                     System.out.println("handleAck multiple");
    27                     confirmSet.headSet(deliveryTag+1).clear();
    28                 }else{
    29                     System.out.println("handleAck false");
    30                     confirmSet.remove(deliveryTag);
    31                 }
    32             }
    33             //有问题
    34             public void handleNack(long deliveryTag, boolean multiple) throws IOException {
    35                 if (multiple){
    36                     System.out.println("handleNack multiple");
    37                     confirmSet.headSet(deliveryTag+1).clear();
    38                 }else{
    39                     System.out.println("handleNack false");
    40                     confirmSet.remove(deliveryTag);
    41                 }
    42             }
    43         });
    44         String msg="success";
    45         while (true){
    46             long seqNo=channel.getNextPublishSeqNo();
    47             channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
    48             confirmSet.add(seqNo);
    49         }
    50 
    51     }
    52 }

    2.消费者

     1 package com.mq.AsynConfirm;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.*;
     5 
     6 import java.io.IOException;
     7 
     8 public class Receive {
     9     private static final String QUEUE_NAME="test_queue_confirm_asyn";
    10     public static void main(String[] args)throws Exception {
    11         Connection connection = ConnectionUtil.getConnection();
    12         Channel channel = connection.createChannel();
    13         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    14         channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
    15             @Override
    16             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    17                 System.out.println(new String(body,"utf-8"));
    18             }
    19         });
    20     }
    21 }

    3.现象

      Send:

      

  • 相关阅读:
    Elasticsearch的CURD、复杂查询、聚合函数、映射mappings
    Python操作Elasticsearch对象
    Python连接Elasticsearch
    Elasticsearch的分析过程,内置字符过滤器、分析器、分词器、分词过滤器(真是变态多啊!美滋滋)
    Elasticsearch的数据组织
    Elasticsearch背景初识
    Nginx之负载均衡
    linux常用命令
    百度地图API,定位您的当前位置
    使用gulp自动构建项目
  • 原文地址:https://www.cnblogs.com/juncaoit/p/8635633.html
Copyright © 2011-2022 走看看