zoukankan      html  css  js  c++  java
  • Kafka消费者Demo

    Kafka消费者Demo

    依赖包:

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>1.0.0</version>
            </dependency>

    源码:

    package com.kafka.jdbc;
    
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.TopicPartition;
    
    import java.util.*;
    
    public class DefineCustomeConsumer {
    
        private static Map<TopicPartition, Long> currentOffset = new HashMap<>();
    
        public static void main(String[] args) throws Exception {
            // 创建配置信息
            Properties props = new Properties();
            // Kafka 集群
            props.put("bootstrap.servers", "10.168.4.76:9093");
            // 消费者组,只要 group.id 相同,就属于同一个消费者组
            props.put("group.id", "group_gong");
            // 关闭自动提交 offset
            props.put("enable.auto.commit", "false");
            // Key 和 Value 的反序列化类
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            // 创建一个消费者
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            // 消费者订阅主题
            consumer.subscribe(Arrays.asList("topic_gong"), new ConsumerRebalanceListener() {
                // 该方法会在 Rebalance 之前调用
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    commitOffset(currentOffset);
                }
    
                // 该方法会在 Rebalance 之后调用
                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    currentOffset.clear();
                    for (TopicPartition partition : partitions) {
                        consumer.seek(partition, getOffset(partition));
                        // 定位到最近提交的 offset 位置继续消费
                    }
                }
            });
    //        while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);// 消费者拉取数据
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value= %s%n", record.offset(), record.key(), record.value());
                    currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());
                }
                commitOffset(currentOffset);// 异步提交
                Thread.sleep(1000);
    //        }
        }
    
        // 获取某分区的最新 offset
        private static long getOffset(TopicPartition partition) {
            return 0;
        }
    
        // 提交该消费者所有分区的 offset
        private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
        }
    
    }
  • 相关阅读:
    【转】清理vs2008中的最近打开项目
    GridView的RowCommand事件传两个或以上参数
    ASP.NET中获取DataList中的控件
    判断导入的Excel中的数据
    ASP.NET中获取GridView中的控件
    Oracle拼接序列
    asp.net常用到的字符串处理
    文本框只能输入数字代码
    Tomcat多域名,虚拟目录配置
    Windows2008 部署tomcat后,局域网访问不了的问题
  • 原文地址:https://www.cnblogs.com/gongxr/p/13268334.html
Copyright © 2011-2022 走看看