zoukankan      html  css  js  c++  java
  • flink 读取kafka 数据,partition分配

    每个并发有个编号,只会读取kafka partition  % 总并发数 == 编号 的分区
     
    如: 6 分区, 4个并发
    分区: p0 p1 p2 p3 p4 p5
    并发: 0 1 2 3 
     
    分区 p0 分配给并发 0 :    0 % 4 = 0
    分区 p1分配给并发1:    1 % 4 = 1
    分区 p2分配给并发2:    2 % 4 = 2
    分区 p3 分配给并发 3:    3 % 4 = 3
    分区 p4 分配给并发 0 :    4 % 4 = 0
    分区 p5 分配给并发 5 :    5 % 4 = 1
     
    源码解析:
    FlinkKafkaConsumerBase.java  458 行 open 方法:
    public void open(Configuration configuration) throws Exception {
    调用 AbstractPartitionDiscoverer 类的方法 allPartitions ,发现并分配分区:
    final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
    发现订阅的全部分区,并移除部分分区:
    Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
    KafkaTopicPartition nextPartition;
    while (iter.hasNext()) {
    nextPartition = iter.next();
    if (!setAndCheckDiscoveredPartition(nextPartition)) {
    iter.remove();
    }
    }
    添加还未发现的分区
    public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
    if (isUndiscoveredPartition(partition)) {
    discoveredPartitions.add(partition);

    return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;
    }

    return false;
    }

    public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
    int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;

    // here, the assumption is that the id of Kafka partitions are always ascending
    // starting from 0, and therefore can be used directly as the offset clockwise from the start index
    return (startIndex + partition.getPartition()) % numParallelSubtasks;
    }

    先发现订阅的主题的所有新分区,循环所有新分区,把还未发现的新分区添加到发现的分区列表,把还未发现的新分区的
    startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;
    求 startIndex(不知道干嘛), startIndex + 分区编号,对 并发数取余,
    返回的结果 与 当前sub task 的并发编号相比,如果不相等,就把分区从新分区列表中移除,最后剩下的分区就是当前并发
    读取的分区。
  • 相关阅读:
    c如何弹出保存路径/保存文件对话框
    c++ 读写txt方法
    windows获取环境变量
    Block UI 获取treelist column值
    MFC中如何弹出选择文件/文件夹对话框(C++)
    What can change the CID of a NX license server?
    spring之AOP
    spring注解开发
    spring配置Bean
    spring之IOC和DI实现
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/10636486.html
Copyright © 2011-2022 走看看