zoukankan      html  css  js  c++  java
  • Flink中TaskManager端执行用户逻辑过程(源码分析)

    TaskManager接收到来自JobManager的jobGraph转换得到的TDD对象,启动了任务,在StreamInputProcessor类的processInput()方法中

    通过一个while(true)中不停的拉取上游的数据,然后调用streamOperator.processElement(record)调用用户实现的方法去处理数据拉取的数据

    首先先来看下这个operator对象

    然后看看OneInputStreamOperator类的UML

    这里所有的实现类没有全部列出,只列了一些代表

    看到这里,写过Flink的streamAPI的同学,肯定感觉到很熟悉!!!!!!

    这里!不就是我们常写flink代码的那些算子嘛

    对没有错,我们程序中实现的那些算子逻辑,最后都会被封装成一个OneInputStreamOperator,这里具体看一个最熟悉的Fliter

    来看一下StreamFilter的processElement方法

    !!!这里传入一个数据后,这个userFunction调用了filter方法并且把数据放进去了

    当返回true通过这个output.collect发送出去了

    这不就对应了我们用户自己实现的filter算子嘛,没错这个方法其实就是客户端的filter方法,这个userFunction包含了用户实现filter算子的逻辑

    (!!!!!就是说这个processElement方法会调用用户的逻辑)

    (所以这个userFunction可以带上client的方法实现,这对我们很重要,特别是对flink源码修改,为clientApi添加新功能方法,运行时可以通过这里拿到)

    继续

    来看看这个output.collect()方法

    然后

     

    看到这个,等等等等

    我不是从这个processElement()方法进来的吗,怎么又开始调processElement()方法了

    难道递归了? 不对不对

    这里operator不是上一个operator了,而是这个output对象的(这里是chainOutPut)

    看下这个output对象

    看下UML类图,也是只列举了重要的

    先看chainingOutPut的属性

     

    发现了又出现了OneInputStreamOperator对象

    看到这个实现类的名字!chain联想起了什么

    Flink会将可以chain在一起的算子在streamGraph转换成jobGraph的时候根据条件chain在一起

    一惊!

    来分别看一下ChainingOutPut和RecordWriterOutput的collect()方法有什么区别

    在chain中

     在RecordWriter中

    这里chain的ouput,又继续调用了下一个operator的processElement方法,然后又在processElement方法中又调用output.collect( ),collect中又调用了下一个operator的processElement方法

    整个过程就是个无限的循环,直到,某一个operator的ouput不为ChainingOutPut,当变为RecordWriterOutput时

    上面看到RecordWriterOutput的processElement直接emit发送出去了这个数据,再也没有继续调用processElement方法了

    这里也就对应了,flink中的责任链,chain在一起的算子会一个接着一个执行,直到无法chain,就会往下游发送emit了

    来看一下UML类图帮助理解

     里中有我,我中有你,一直相互调用直到无法chain,然后emit往下游发送(这里肯定就有发送端的反压逻辑,以后随缘更新)

    那这里的循环调用理解了就会想,那如何确定第一个operator调用,然后进入整个调用链呢

    回到TaskManager接收到JobManager的TDD以后初始化整个任务的时候

    StreamTask.java中invoke方法中

     先是初始化了一个OperatorChain,里面其实就是一个数组StreamOperator

    在他初始化的时候,其实就是为我们所有的streamOutputs设置了他的output以及会根据jobManager发送过来的TDD(包含信息)

    设置成对应的ChainingOutPut还是RecordWriterOutput,chainOutput会设置他的的operator

    然后获取了getHeadOperator()其实就是获取了他调用连中的第一个

    然后在

     

    将这个第一个operator关联到了inputProcessor对象里面

    后面就简单了在inputProcessor.processInput中就进入了while(true)循环拉取上游数据的逻辑

    然后

    在这里调用的第一个processElement方法就是我们的那个headOperator

    这样整个调用责任链就开始从第一个Operator运行起来了

  • 相关阅读:
    LeetCode Notes_#20 Valid Parentheses
    LeetCode Notes_#14 Longest Common Prefix
    牛客21天刷题_day#3
    牛客21天刷题_day#2
    牛客21天刷题_day#1
    WebGL编程指南
    《Redis 设计与实现》
    《女士品茶》
    《Java应用架构设计:模块化模式与OSGi》
    《编译与反编译技术实战》
  • 原文地址:https://www.cnblogs.com/ljygz/p/11504220.html
Copyright © 2011-2022 走看看