zoukankan      html  css  js  c++  java
  • kafka的暂停消费和重新开始消费问题

    //暂停kafka的消费 暂停分区的分配
    consumer.unsubscribe();//此处不取消订阅暂停太久会出现订阅超时的错误
    consumer.pause(consumer.assignment());



    //重新消费分区,此处不重新分配会出错
    this.open(null,null,null);
        if (null == consumer) {
    Properties props = new Properties();
    props.put("bootstrap.servers", PropertiesUtil.getValue("bootstrap.servers"));
    // 消费者的组id
    props.put("group.id", constant.kafka_groupName);//Spider2
    props.put("enable.auto.commit", "false");
    // max.poll.interval.ms(官网给得默认值为3000)的意思为,当我们从kafkaServer端poll消息时,poll()的调用之间的最大延迟。
    // 这提供了消费者在获取更多记录之前可以空闲的时间量的上限。 如果在此超时到期之前未调用poll(),则认为使用者失败,并且消费
    // 者组将重新平衡以便将分区重新分配给其他消费者,而恰好这里我们设置了Thread.sleep(6000) > max.poll.interval.ms值,
    // 也就是我们在手动提交的时候,实际上分区信息已经被分配到整个消费者组里面的其它消费者了
    props.put("auto.commit.interval.ms", "3000");
    // 从poll(拉)的回话处理时长
    props.put("session.timeout.ms", "100000");
    props.put("request.timeout.ms", "200000");
    props.put("max.poll.records", "2");
    // poll的数量限制
    // props.put("max.poll.records", "100");
    /* props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");*/
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    props.put("group.name", UUID.randomUUID().toString().replaceAll("-", ""));
    consumer = new KafkaConsumer<String, String>(props);

    // 订阅主题列表topic
    //consumer.subscribe(Arrays.asList("test_input"));
    }
    //注册kafka rebalanceListener
    //consumer.subscribe(Arrays.asList("test_etl"), new ConsumerRebalanceListener(){

    listener = new ConsumerRebalanceListener(){
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    System.out.printf("threadId = {}, onPartitionsRevoked.", Thread.currentThread().getId());
    consumer.commitSync(offsetsMap);
    consumer.commitSync();
    }
    @Override
    public void onPartitionsAssigned(
    Collection<TopicPartition> partitions) {
    System.out.printf("threadId = {}, onPartitionsAssigned.", Thread.currentThread().getId());
    consumer.commitSync();
    offsetsMap.clear();
    }};

    consumer.subscribe(Arrays.asList(topicName.split(",")[0],topicName.split(",")[1],topicName.split(",")[2]), listener);
    consumer.resume(consumer.assignment());
  • 相关阅读:
    使用SetTimer函数为Delphi的Win控件设置时钟
    关于QT版本的安装配置的一些困惑
    Linux设备驱动开发详解-Note(11)--- Linux 文件系统与设备文件系统(3)
    邪恶的C++
    TControl的主要功能研究(属性,函数,事件)
    Delphi研究,对全局变量函数与OOP编程关系的一点体会 good
    QT 相关资源(书籍、论坛、博客等。。。)整理
    VC UI界面库大集合
    .net程序员求职简历
    C++著名程序库的比较
  • 原文地址:https://www.cnblogs.com/yaohaitao/p/12172867.html
Copyright © 2011-2022 走看看