zoukankan      html  css  js  c++  java
  • Flink的keyby延时源码

    public class RecordWriter<T extends IOReadableWritable> {

    ==FullBuffer
    /**
    * This is used to send LatencyMarks to a random target channel.
    */
    public void randomEmit(T record) throws IOException, InterruptedException {
    sendToTarget(record, rng.nextInt(numChannels));
    }

    private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
    RecordSerializer<T> serializer = serializers[targetChannel];

    SerializationResult result = serializer.addRecord(record);

    while (result.isFullBuffer()) {
    if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
    // If this was a full record, we are done. Not breaking
    // out of the loop at this point will lead to another
    // buffer request before breaking out (that would not be
    // a problem per se, but it can lead to stalls in the
    // pipeline).
    if (result.isFullRecord()) {
    break;
    }
    }
    BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);

    result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
    }
    checkState(!serializer.hasSerializedData(), "All data should be written at once");

    if (flushAlways) {
    targetPartition.flush(targetChannel);
    }
    }



    ==getBufferTimeout
    package org.apache.flink.streaming.runtime.tasks;
    public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>

    @VisibleForTesting
    public static <OUT> List<StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createStreamRecordWriters(
    StreamConfig configuration,
    Environment environment) {
    List<StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>> streamRecordWriters = new ArrayList<>();
    List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(environment.getUserClassLoader());
    Map<Integer, StreamConfig> chainedConfigs = configuration.getTransitiveChainedTaskConfigsWithSelf(environment.getUserClassLoader());

    for (int i = 0; i < outEdgesInOrder.size(); i++) {
    StreamEdge edge = outEdgesInOrder.get(i);
    streamRecordWriters.add(
    createStreamRecordWriter(
    edge,
    i,
    environment,
    environment.getTaskInfo().getTaskName(),
    chainedConfigs.get(edge.getSourceId()).getBufferTimeout()));
    }
    return streamRecordWriters;

    }

    http://vinoyang.com/2016/12/30/flink-runtime-producer-result-partition/
    http://vinoyang.com/2017/01/04/flink-runtime-consumer-input-gate/
    http://vinoyang.com/2017/01/08/flink-runtime-netty-part-1/
    http://vinoyang.com/2017/01/12/flink-runtime-netty-part-2/
    http://vinoyang.com/2017/01/15/flink-runtime-netty-part-3/
    http://vinoyang.com/2016/12/14/flink-runtime-NetworkEnvironment/
    http://vinoyang.com/2016/12/28/flink-runtime-communicate-api/
    http://vinoyang.com/archives/2016/12/
    http://vinoyang.com/2016/12/20/flink-runtime-unified-data-exchange/
  • 相关阅读:
    @Transactional 什么情况下会失效?
    如何主持一场专业的面试?
    MIT-HIB 心率数据库及相关
    hadoop中Writable类
    XXX.jar has no source attachment
    Win10Eclipse配置个人本地hadoop
    js去除两个数组中重复的元素
    JS找出两个数组中不相同的元素
    flex中order控制元素的排列顺序
    flex中align-self设置侧轴的某元素的对其方式
  • 原文地址:https://www.cnblogs.com/WCFGROUP/p/9206746.html
Copyright © 2011-2022 走看看