zoukankan      html  css  js  c++  java
  • 同步和异步组合提交

    同步和异步组合提交

    一般情况下,针对偶尔出现的提价失败,不进行重试不会有太大的问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会成功,但如果这是发生在关闭消费者或者再均衡前的最后一次提交,就要确保能够提交成功。

    ​ 因此,在消费者关闭前一般都会组合使用commitAsync() commitSync() 他们的工作原理如下

    try{
        while(true){
            ConsumerRecords<String,String> records = consumer.poll(100);
            for(ConsumerRecord<String,String> record:records){
                System.out.printf("topic = %s,partition = %s,offset = %d,customer = %s,country = %s
    ",record.topic(),record.partition(),record.topic(),record.partition(),record.offset(),record.key(),record.value());
            }
            consumer.commitAsync();
        }
    }cache(Exception e){
        log.error("Unexpected error", e);
    }finally {
        try{
            consumer.commitSync();
        }finally{
            consumer.close();
        }
    }
    

    提交特定的偏移量

    提交偏移量的频率与处理消息批次的频率是一样的,但是如果想更频繁的提交该怎么办?如果poll()方法返回一大批数据,为了避免因再均衡引起的重复处理批消息,想要在批次中间提交偏移量怎么办? 这种情况无法通过调用commitSync()commitAsync() 来实现,因为他们只会提交最后一个偏移量,而此时该批次里的消息还没有处理完。

    消费者api允许在调用commitSync()commitAsync()方法时提交的分区和偏移量的map,假设你处理了半个批次的消息,最后来自主题customers分区3的消息的偏移量是5000 你可以调用commitSync()来提交他,不过,因为消费者可能不只读取一个分区,你需要跟踪所有分区。

    下面是提交特定偏移量的例子:

    private Map<TopicPartition,OffsetAndMetadata> currentOffsets = new HashMap<>();
    int count = 0;
    ...
    while(true){
        ConsumerRecords<String,String> records = consumer.poll(100);
        for(ConsumerRecord<String,String> record : records){
            System.out.printf("topic = %s,partition = %s,offset = %d,customer = %s,country = %s
    ",record.topic(),record.partition(),record.offset(),record.key(),record.value());
            currentOffset.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset)+1,"no metadata"));
            if(count* 1000 == 0)
                consumer.commitAsync(currentOffsets,null);
            count++;
        }
    }
    
    
    
    
    
    
  • 相关阅读:
    找控件的父类
    silverlight和wpf中暴露 给子类override
    Oracle PLSQL 记录
    C#之TopShelf启动Windows服务 原文链接:https://blog.csdn.net/qq_36664495/java/article/details/90600995
    Super socket 记录知识
    oracle 查找字符位置 开始按照长度截取
    转 acl 库是啥、主要包含哪些功
    转自 posted on 2015-05-18 11:50 LitDev https://www.cnblogs.com/New-world/p/4511543.html
    dtu server 编译错误
    iOS 杂笔-22(万年一遇~一张图片对代理的理解)
  • 原文地址:https://www.cnblogs.com/xido/p/14607602.html
Copyright © 2011-2022 走看看