zoukankan      html  css  js  c++  java
  • kafka将消费进度重置到最新的位置

    /**
    *重置kafka消费进度
    *参数中需要指定kafka集群中一台broker地址,要重置的topic名称,消费组,以及partition个数
    */
    public
    static void seekLatest(String broker, String topic, String group, int partitionCnt){ Map<String, Object> configProps = new HashMap<>(); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, group); configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); configProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,100); configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); DefaultKafkaConsumerFactory<Long, byte[]> factory = new DefaultKafkaConsumerFactory<>(configProps, new LongDeserializer(), new ByteArrayDeserializer()); org.apache.kafka.clients.consumer.KafkaConsumer<Long, byte[]> consumer = (org.apache.kafka.clients.consumer.KafkaConsumer<Long, byte[]>) factory.createConsumer(); consumer.subscribe(Arrays.asList(topic)); List<TopicPartition> topicPartitionLists = new ArrayList<>(); for(int i = 0 ; i < partitionCnt; i ++){ topicPartitionLists.add(new TopicPartition(topic, i)); } consumer.poll(1); Map<TopicPartition, Long> topicPartitionLongMap = consumer.endOffsets(topicPartitionLists); for(TopicPartition tp : topicPartitionLongMap.keySet()){ consumer.seek(tp, topicPartitionLongMap.get(tp)); } consumer.close(); }
  • 相关阅读:
    Nginx 本地建立负载均衡(Windows环境)
    Nginx 代理本地文件夹(Windows环境)
    PostGIS 使用Mysql_fdw同步ArcGIS填坑记录
    PostGIS mysql_fdw操作日志(留观)
    PostGIS mysql_fdw使用(Linux)
    PostGIS mysql_fdw安装(Linux)
    PostGIS 安装教程(Linux)(二)
    PostGIS 安装教程(Linux)(一)
    Linux 命令记录
    PostGIS 查看表属性(字段、类型、是否为空)
  • 原文地址:https://www.cnblogs.com/ZhengQiZHou/p/12627921.html
Copyright © 2011-2022 走看看