zoukankan      html  css  js  c++  java
  • 同步和异步组合提交

    同步和异步组合提交

    一般情况下,针对偶尔出现的提价失败,不进行重试不会有太大的问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会成功,但如果这是发生在关闭消费者或者再均衡前的最后一次提交,就要确保能够提交成功。

    ​ 因此,在消费者关闭前一般都会组合使用commitAsync() commitSync() 他们的工作原理如下

    try{
        while(true){
            ConsumerRecords<String,String> records = consumer.poll(100);
            for(ConsumerRecord<String,String> record:records){
                System.out.printf("topic = %s,partition = %s,offset = %d,customer = %s,country = %s
    ",record.topic(),record.partition(),record.topic(),record.partition(),record.offset(),record.key(),record.value());
            }
            consumer.commitAsync();
        }
    }cache(Exception e){
        log.error("Unexpected error", e);
    }finally {
        try{
            consumer.commitSync();
        }finally{
            consumer.close();
        }
    }
    

    提交特定的偏移量

    提交偏移量的频率与处理消息批次的频率是一样的,但是如果想更频繁的提交该怎么办?如果poll()方法返回一大批数据,为了避免因再均衡引起的重复处理批消息,想要在批次中间提交偏移量怎么办? 这种情况无法通过调用commitSync()commitAsync() 来实现,因为他们只会提交最后一个偏移量,而此时该批次里的消息还没有处理完。

    消费者api允许在调用commitSync()commitAsync()方法时提交的分区和偏移量的map,假设你处理了半个批次的消息,最后来自主题customers分区3的消息的偏移量是5000 你可以调用commitSync()来提交他,不过,因为消费者可能不只读取一个分区,你需要跟踪所有分区。

    下面是提交特定偏移量的例子:

    private Map<TopicPartition,OffsetAndMetadata> currentOffsets = new HashMap<>();
    int count = 0;
    ...
    while(true){
        ConsumerRecords<String,String> records = consumer.poll(100);
        for(ConsumerRecord<String,String> record : records){
            System.out.printf("topic = %s,partition = %s,offset = %d,customer = %s,country = %s
    ",record.topic(),record.partition(),record.offset(),record.key(),record.value());
            currentOffset.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset)+1,"no metadata"));
            if(count* 1000 == 0)
                consumer.commitAsync(currentOffsets,null);
            count++;
        }
    }
    
    
    
    
    
    
  • 相关阅读:
    mahout协同过滤算法
    如何实现团队的自组织管理
    Trail: JDBC(TM) Database Access(3)
    JavaEE5 Tutorial_JavaBean,JSTL
    JavaEE5 Tutorial_Jsp,EL
    JavaEE5 Tutorial_Servlet
    J2SE7规范_2013.2_类
    J2SE7规范_2013.2_类型_命名
    Trail: JDBC(TM) Database Access(2)
    Trail: JDBC(TM) Database Access(1)
  • 原文地址:https://www.cnblogs.com/xido/p/14607602.html
Copyright © 2011-2022 走看看