zoukankan      html  css  js  c++  java
  • 从KafkaConsumer看看Kafka(一)

      Kafka的消息模型为发布订阅模型,消息生产者将消息发布到主题(topic)中,一个或多个消费者订阅(消费)该主题消息并消费,此模型中发布到topic中的消息会被所有消费者所订阅到,先介绍Kafka消费模型,然后再通过KafkaConsumer原来了解它的业务流程,源码基于kafka 2.4;

    Kafka消费模型关键点:

      1、Kafka一个消费组(ConsumerGroup)中存在一个或多个消费者(Consumer),每个消费者也必须属于一个消费者组;
      2、消费者组(ConsumerGroup)中的消费者(Consumer)独占一个或多个分区(Partition);
      3、消费时每个分区(Partition)最多只有一个Consumer再消费;
      4、消费者组(ConsumerGroup)在Broker存在一个协调者(Coordinator)分配管理Consumer与Partition之间的对应关系。当两种中的Consumer或Partition发生变更时将会触发reblance(重新平衡),重新分配Consumer与Partition的对应关系;

    下面是Kafka消费者程序的示例:

    //配置Consumer
    Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
    //创建Consumer
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    //订阅主题
     consumer.subscribe(Arrays.asList("foo", "bar"));
    //消费消息
         while (true) {
         ConsumerRecords<String, String> records = 
    consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }
    

      在上面我们可以看到Kafka消费消息的整个流程:配置Consumer属性、订阅主题、拉取消费消息,基本流程知道了也就是这几个点,配置ConsumerId、自动提交offset、序列化、Kafka服务端地址,这就是Kafka最最最基础的配置,当然还有很多配置项可以到官网查看;

    消费者关键点

      Consumer程序主要分为三个部分:配置、订阅主题、拉取消息;从中也可以看到在消费前需要订阅某个主题、在前面我们提到Consumer实例需要与某个Partition绑定关联然后才能进行消费数据,下面我们透过官方提供的Consumer程序简单看看如何订阅主题、如何关联Consumer与Partition、如何拉取消息消费;

    订阅主题
      订阅主题可以说是Kafka消费的基础,下面先看看简化后的订阅方法:

    public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
        acquireAndEnsureOpen();
        try {
           //忽略部分代码
            if (topics.isEmpty()) {
                this.unsubscribe();
            } else {
                if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
                    metadata.requestUpdateForNewTopics();
            }
        } finally {
            release();
        }
    }
    

      安全检查: Consumer注释中也说了KafkaConsumer为非线程安全的,从上也可看到acquireAndEnsureOpen的作用就是检查当前是否为多线程运行,确保Consumer只在一个线程中执行;
      设置订阅状态: SubscriptionState 对象的subscribe方法主要是设置ConsumerRelance监听器、设置所监听的主题;
      更新元数据: metadata对象维护了Kafka集群元数据子集,存储了Broker节点、Topic、Partition节点信息等;

      跟进metadata.requestUpdateForNewTopics方法发现最终调用了metadata对象的requestUpdate方法;

    public synchronized int requestUpdate() {
        this.needUpdate = true;
        return this.updateVersion;
    }
    

      此方法并没有什么实质性的动作,只是更新needUpdate属性为true;由于Kafka拉取数据时必须得到元数据信息否则无法知道broker、topic、Partition信息也就无法知道去哪个节点拉取数据;但此处并没有实质性的更新元数据请求,接下来我们看看拉取方法。

    拉取数据
      上一步订阅了主题,这时我们就可以从中拉取数据,跟踪代码最终进入了KafkaConsumer的poll方法;

    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
        //多线程检查
        acquireAndEnsureOpen();
        try {//省略代码
    //超时检查
                if (includeMetadataInTimeout) {
                    //请求更新元数据
                    if (!updateAssignmentMetadataIfNeeded(timer)) {
                        return ConsumerRecords.empty();
                    }
                } else {//省略代码
    			}
                //拉取数据
                final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
                if (!records.isEmpty()) {
                    if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                        client.pollNoWakeup();
                    }
                    //调用消费者拦截器后返回
                    return this.interceptors.onConsume(new ConsumerRecords<>(records));
                }
            return ConsumerRecords.empty();
        } finally {
            release();
            this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
        }
    }
    

    此方法几个流程
    1、 多线程检查
    2、 超时检查
    3、 请求更新元数据
    4、 拉取数据
      此处我们比较关心的还是更新元数据与拉取数据,这里我们主要看看这两个流程的执行;

    请求更新元数据
      在updateAssignmentMetadataIfNeeded方法中调用coordinator对象的poll方法去更新元数据,并且调用updateFetchPositions方法用于刷新Consumer对应Partition对应的offset值;

    拉取数据
      数据的拉取在pollForFetches方法中;

    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
        //省略代码
        //从缓存区数据
        final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
        if (!records.isEmpty()) {
            return records;
        }
    	//构造拉取请求发送
        fetcher.sendFetches();
    
        //省略代码
    	//发起拉取数据请求
        Timer pollTimer = time.timer(pollTimeout);
        client.poll(pollTimer, () -> {
            // since a fetch might be completed by the background thread, we need this poll condition
            // to ensure that we do not block unnecessarily in poll()
            return !fetcher.hasAvailableFetches();
        });
        timer.update(pollTimer.currentTimeMs());
    	//省略代码
    
        return fetcher.fetchedRecords();
    

    }

    pollForFetches方法执行逻辑:

      1、 从缓存取数据如有可用数据,直接返回;
      2、 构造请求对象fetches,一个节点node对应一个clientRequest对象,将其放入ConsumerNetworkClient对象的unsent属性中;
      3、 调用client对象poll方法,将上一步放入unsent属性的请求对象ClientRequest发送出去;
      4、 返回所拉取到的消息;

    Offset提交
      offset提交放在ConsumerCoordinator对象中,offset提交又分为自动提交与手动提交;当设置了enable.auto.commit==true且  autoCommitIntervalMs等于指定间隔时有这么几个时机会触发自动:

      1、 consumer对象close时,调用commitOffsetsSync触发同步的offset提交;
      2、 consumer对象poll时,调用commitOffsetsAsync触发异步的offset提交;
      3、 触发Partition与Topic 分配 assign时触发commitOffsetsAsync异步提交;
      4、 当发生relance或有Consumer加入Group时触发commitOffsetsSync方法同步提交;

    参考资料: http://kafka.apache.org

  • 相关阅读:
    list浅析
    C#尝试读取或写入受保护的内存。这通常指示其他内存已损坏(catch不起作用)
    浅析C#线程同步事件-WaitHandle
    C#操作xml方法1
    C#简单的操作csv文件
    C#的int类型?,??,~的意思,string类型空值赋值
    将多个exc表格汇总于一个表格中
    C#禁止双击标题栏等操作
    c#泛型
    c#session
  • 原文地址:https://www.cnblogs.com/softlin/p/11706157.html
Copyright © 2011-2022 走看看