zoukankan      html  css  js  c++  java
  • Flink的keyby延时源码

    public class RecordWriter<T extends IOReadableWritable> {

    ==FullBuffer
    /**
    * This is used to send LatencyMarks to a random target channel.
    */
    public void randomEmit(T record) throws IOException, InterruptedException {
    sendToTarget(record, rng.nextInt(numChannels));
    }

    private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
    RecordSerializer<T> serializer = serializers[targetChannel];

    SerializationResult result = serializer.addRecord(record);

    while (result.isFullBuffer()) {
    if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
    // If this was a full record, we are done. Not breaking
    // out of the loop at this point will lead to another
    // buffer request before breaking out (that would not be
    // a problem per se, but it can lead to stalls in the
    // pipeline).
    if (result.isFullRecord()) {
    break;
    }
    }
    BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);

    result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
    }
    checkState(!serializer.hasSerializedData(), "All data should be written at once");

    if (flushAlways) {
    targetPartition.flush(targetChannel);
    }
    }



    ==getBufferTimeout
    package org.apache.flink.streaming.runtime.tasks;
    public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>

    @VisibleForTesting
    public static <OUT> List<StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createStreamRecordWriters(
    StreamConfig configuration,
    Environment environment) {
    List<StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>> streamRecordWriters = new ArrayList<>();
    List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(environment.getUserClassLoader());
    Map<Integer, StreamConfig> chainedConfigs = configuration.getTransitiveChainedTaskConfigsWithSelf(environment.getUserClassLoader());

    for (int i = 0; i < outEdgesInOrder.size(); i++) {
    StreamEdge edge = outEdgesInOrder.get(i);
    streamRecordWriters.add(
    createStreamRecordWriter(
    edge,
    i,
    environment,
    environment.getTaskInfo().getTaskName(),
    chainedConfigs.get(edge.getSourceId()).getBufferTimeout()));
    }
    return streamRecordWriters;

    }

    http://vinoyang.com/2016/12/30/flink-runtime-producer-result-partition/
    http://vinoyang.com/2017/01/04/flink-runtime-consumer-input-gate/
    http://vinoyang.com/2017/01/08/flink-runtime-netty-part-1/
    http://vinoyang.com/2017/01/12/flink-runtime-netty-part-2/
    http://vinoyang.com/2017/01/15/flink-runtime-netty-part-3/
    http://vinoyang.com/2016/12/14/flink-runtime-NetworkEnvironment/
    http://vinoyang.com/2016/12/28/flink-runtime-communicate-api/
    http://vinoyang.com/archives/2016/12/
    http://vinoyang.com/2016/12/20/flink-runtime-unified-data-exchange/
  • 相关阅读:
    【sqli-labs】 less61 GET -Challenge -Double Query -5 queries allowed -Variation4 (GET型 挑战 双查询 只允许5次查询 变化4)
    Spring overview
    Code First use dotConnect for MySQL
    讓 MySQL 能夠用 EF6
    Sublime Text 3 常用插件以及安装方法(转)
    EntityFramework 6.0< Code First > 连接 Mysql数据库(转)
    bootstrap 2.3版与3.0版的使用区别
    用google-code-prettify高亮代码
    MVC中的@Html.DisplayFor等方法如何控制日期的显示格式(转)
    给Jquery easyui 的datagrid 每行增加操作链接(转)
  • 原文地址:https://www.cnblogs.com/WCFGROUP/p/9206746.html
Copyright © 2011-2022 走看看