zoukankan      html  css  js  c++  java
  • flink 读取kafka 数据,partition分配

    每个并发有个编号,只会读取kafka partition  % 总并发数 == 编号 的分区
     
    如: 6 分区, 4个并发
    分区: p0 p1 p2 p3 p4 p5
    并发: 0 1 2 3 
     
    分区 p0 分配给并发 0 :    0 % 4 = 0
    分区 p1分配给并发1:    1 % 4 = 1
    分区 p2分配给并发2:    2 % 4 = 2
    分区 p3 分配给并发 3:    3 % 4 = 3
    分区 p4 分配给并发 0 :    4 % 4 = 0
    分区 p5 分配给并发 5 :    5 % 4 = 1
     
    源码解析:
    FlinkKafkaConsumerBase.java  458 行 open 方法:
    public void open(Configuration configuration) throws Exception {
    调用 AbstractPartitionDiscoverer 类的方法 allPartitions ,发现并分配分区:
    final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
    发现订阅的全部分区,并移除部分分区:
    Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
    KafkaTopicPartition nextPartition;
    while (iter.hasNext()) {
    nextPartition = iter.next();
    if (!setAndCheckDiscoveredPartition(nextPartition)) {
    iter.remove();
    }
    }
    添加还未发现的分区
    public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
    if (isUndiscoveredPartition(partition)) {
    discoveredPartitions.add(partition);

    return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;
    }

    return false;
    }

    public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
    int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;

    // here, the assumption is that the id of Kafka partitions are always ascending
    // starting from 0, and therefore can be used directly as the offset clockwise from the start index
    return (startIndex + partition.getPartition()) % numParallelSubtasks;
    }

    先发现订阅的主题的所有新分区,循环所有新分区,把还未发现的新分区添加到发现的分区列表,把还未发现的新分区的
    startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;
    求 startIndex(不知道干嘛), startIndex + 分区编号,对 并发数取余,
    返回的结果 与 当前sub task 的并发编号相比,如果不相等,就把分区从新分区列表中移除,最后剩下的分区就是当前并发
    读取的分区。
  • 相关阅读:
    NanoProfiler
    NanoProfiler
    Open Source Cassandra Gitbook for Developer
    Android Fragment使用(四) Toolbar使用及Fragment中的Toolbar处理
    Android Fragment使用(三) Activity, Fragment, WebView的状态保存和恢复
    Android Fragment使用(二) 嵌套Fragments (Nested Fragments) 的使用及常见错误
    Android Fragment使用(一) 基础篇 温故知新
    Set up Github Pages with Hexo, migrating from Jekyll
    EventBus源码解析 源码阅读记录
    Android M Permission 运行时权限 学习笔记
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/10636486.html
Copyright © 2011-2022 走看看