zoukankan      html  css  js  c++  java
  • [源码分析] 从FlatMap用法到Flink的内部实现

    [源码分析] 从FlatMap用法到Flink的内部实现

    0x00 摘要

    本文将从FlatMap概念和如何使用开始入手,深入到Flink是如何实现FlatMap。希望能让大家对这个概念有更深入的理解。

    0x01 Map vs FlatMap

    首先我们先从概念入手。

    自从响应式编程慢慢壮大以来,这两个单词现在越来越被大家熟悉了。前端能见到它们的身影,后台也能见到;安卓里面有,iOS也有。很多兄弟刚遇到它们时候是懵圈的,搞不清楚之间的区别。下面我就给大家简单讲解下。

    map

    它把数组流中的每一个值,使用所提供的函数执行一遍,一一对应。得到与元素个数相同的数组流。然后返回这个新数据流。

    flatMap

    flat是扁平的意思。所以这个操作是:先映射(map),再拍扁(join)。

    flatMap输入可能是多个子数组流。所以flatMap先针对 每个子数组流的每个元素进行映射操作。然后进行扁平化处理,最后汇集所有进行扁平化处理的结果集形成一个新的列表(扁平化简而言之就是去除所有的修饰)。

    flatMap与map另外一个不一样的地方就是传入的函数在处理完后返回值必须是List。

    实例

    比如拿到一个文本文件之后,我们是按行读取,按行处理。如果要对每一行的单词数进行计数,那么应该选择Map方法,如果是统计词频,就应该选择flatMap方法。

    如果还不清楚,可以看看下面这个例子:

    梁山新进一批好马,准备给每个马军头领配置一批。于是得到函数以及头领名单如下:
    
    函数 = ( 头领 => 头领 + 好马 )
    五虎将 = List(关胜、林冲、秦明、呼延灼、董平 )
    八骠骑 = List(花荣、徐宁、杨志、索超、张清、朱仝、史进、穆弘 )
    
    // Map函数的例子
    利用map函数,我们可以得到 五虎将马军
    
    五虎将马军 = 五虎将.map( 头领 => 头领 + 好马 )
    结果是 List( 关胜 + 马、林冲 + 马、秦明 + 马、呼延灼 + 马、董平 + 马 )
    
    // flatMap函数的例子
    但是为了得到统一的马军,则可以用flatMap:
    
    马军头领 = List(五虎将,八骠骑)
    马军 = 马军头领.flatMap( 头领 => 头领 + 好马 ) 
    
    结果就是:List( 关胜 + 马、林冲 + 马、秦明 + 马、呼延灼 + 马、董平 + 马,花荣 + 马、徐宁 + 马、杨志 + 马、索超 + 马、张清 + 马、朱仝 + 马、史进 + 马、穆弘 + 马 )
    

    现在大家应该清楚了吧。接下来看看几个FlatMap的实例。

    Scala语言的实现

    Scala本身对于List类型就有map和flatMap操作。举例如下:

    val names = List("Alice","James","Apple")
    val strings = names.map(x => x.toUpperCase)
    println(strings)
    // 输出 List(ALICE, JAMES, APPLE)
    
    val chars = names.flatMap(x=> x.toUpperCase())
    println(chars)
    // 输出 List(A, L, I, C, E, J, A, M, E, S, A, P, P, L, E)
    

    Flink的例子

    以上是scala语言层面的实现。下面我们看看Flink框架是如何使用FlatMap的。

    网上常见的一个Flink应用的例子:

    //加载数据源
    val source = env.fromElements("china is the best country","beijing is the capital of china")
    
    //转化处理数据
    val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1)
    

    Flink源码中的例子

    case class WordWithCount(word: String, count: Long)
    
    val text = env.socketTextStream(host, port, '
    ')
    
    val windowCounts = text.flatMap { w => w.split("\s") }
      .map { w => WordWithCount(w, 1) }
      .keyBy("word")
      .timeWindow(Time.seconds(5))
      .sum("count")
    
    windowCounts.print()
    

    上面提到的都是简单的使用,如果有复杂需求,在Flink中,我们可以通过继承FlatMapFunction和RichFlatMapFunction来自定义算子。

    函数类FlatMapFunction

    对于不涉及到状态的使用,可以直接继承 FlatMapFunction,其定义如下:

    @Public
    @FunctionalInterface
    public interface FlatMapFunction<T, O> extends Function, Serializable {
    	void flatMap(T value, Collector<O> out) throws Exception;
    }
    

    如何自定义算子呢,这个可以直接看看Flink中的官方例子

    // FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens.
    public class Tokenizer implements FlatMapFunction<String, String> {
      @Override
      public void flatMap(String value, Collector<String> out) {
        for (String token : value.split("\W")) {
          out.collect(token);
        }
      }
    }
    
    // [...]
    DataSet<String> textLines = // [...]
    DataSet<String> words = textLines.flatMap(new Tokenizer());
    

    Rich函数类RichFlatMapFunction

    对于涉及到状态的情况,用户可以使用继承 RichFlatMapFunction 类的方式来实现UDF。

    RichFlatMapFunction属于Flink的Rich函数类。从名称上来看,这种函数类在普通的函数类上增加了Rich前缀,比如RichMapFunctionRichFlatMapFunctionRichReduceFunction等等。比起普通的函数类,Rich函数类增加了:

    • open()方法:Flink在算子调用前会执行这个方法,可以用来进行一些初始化工作。
    • close()方法:Flink在算子最后一次调用结束后执行这个方法,可以用来释放一些资源。
    • getRuntimeContext方法:获取运行时上下文。每个并行的算子子任务都有一个运行时上下文,上下文记录了这个算子运行过程中的一些信息,包括算子当前的并行度、算子子任务序号、广播数据、累加器、监控数据。最重要的是,我们可以从上下文里获取状态数据

    FlatMap对应的RichFlatMapFunction如下:

    @Public
    public abstract class RichFlatMapFunction<IN, OUT> extends AbstractRichFunction implements FlatMapFunction<IN, OUT> {
    	@Override
    	public abstract void flatMap(IN value, Collector<OUT> out) throws Exception;
    }
    

    其基类 AbstractRichFunction 如下,可以看到主要是和运行时上下文建立了联系,并且有初始化和退出操作

    @Public
    public abstract class AbstractRichFunction implements RichFunction, Serializable {
      
    	private transient RuntimeContext runtimeContext;
    
    	@Override
    	public void setRuntimeContext(RuntimeContext t) {
    		this.runtimeContext = t;
    	}
    
    	@Override
    	public RuntimeContext getRuntimeContext() {
    			return this.runtimeContext;
    	}
    
    	@Override
    	public IterationRuntimeContext getIterationRuntimeContext() {
        if (this.runtimeContext instanceof IterationRuntimeContext) {
    			return (IterationRuntimeContext) this.runtimeContext;
    		} 
    	}
    
    	@Override
    	public void open(Configuration parameters) throws Exception {}
    
    	@Override
    	public void close() throws Exception {}
    }
    

    如何最好的使用? 当然还是官方文档和例子最靠谱。

    因为涉及到状态,所以如果使用,你必须创建一个 StateDescriptor,才能得到对应的状态句柄。 这保存了状态名称(你可以创建多个状态,并且它们必须具有唯一的名称以便可以引用它们),状态所持有值的类型,并且可能包含用户指定的函数,例如ReduceFunction。 根据不同的状态类型,可以创建ValueStateDescriptorListStateDescriptorReducingStateDescriptorFoldingStateDescriptorMapStateDescriptor

    状态通过 RuntimeContext 进行访问,因此只能在 rich functions 中使用。 但是我们也会看到一个例子。RichFunctionRuntimeContext 提供如下方法:

    • ValueState getState(ValueStateDescriptor)
    • ReducingState getReducingState(ReducingStateDescriptor)
    • ListState getListState(ListStateDescriptor)
    • AggregatingState getAggregatingState(AggregatingStateDescriptor)
    • FoldingState getFoldingState(FoldingStateDescriptor)
    • MapState getMapState(MapStateDescriptor)

    下面是一个 FlatMapFunction 的例子,展示了如何将这些部分组合起来:

    class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
    
      private var sum: ValueState[(Long, Long)] = _
    
      override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
    
        // access the state value
        val tmpCurrentSum = sum.value
    
        // If it hasn't been used before, it will be null
        val currentSum = if (tmpCurrentSum != null) {
          tmpCurrentSum
        } else {
          (0L, 0L)
        }
    
        // update the count
        val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
    
        // update the state
        sum.update(newSum)
    
        // if the count reaches 2, emit the average and clear the state
        if (newSum._1 >= 2) {
          out.collect((input._1, newSum._2 / newSum._1))
          sum.clear()
        }
      }
    
      override def open(parameters: Configuration): Unit = {
        sum = getRuntimeContext.getState(
          new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
        )
      }
    }
    
    object ExampleCountWindowAverage extends App {
      val env = StreamExecutionEnvironment.getExecutionEnvironment
    
      env.fromCollection(List(
        (1L, 3L),
        (1L, 5L),
        (1L, 7L),
        (1L, 4L),
        (1L, 2L)
      )).keyBy(_._1)
        .flatMap(new CountWindowAverage())
        .print()
      // the printed output will be (1,4) and (1,5)
    
      env.execute("ExampleManagedState")
    }
    

    这个例子实现了一个简单的计数窗口。 我们把元组的第一个元素当作 key(在示例中都 key 都是 “1”)。 该函数将出现的次数以及总和存储在 “ValueState” 中。 一旦出现次数达到 2,则将平均值发送到下游,并清除状态重新开始。 请注意,我们会为每个不同的 key(元组中第一个元素)保存一个单独的值。

    0x03 从Flink源码入手看FlatMap实现

    FlatMap从Flink编程模型角度讲属于一个算子,用来对数据流或者数据集进行转换。从框架角度说,FlatMap是怎么实现的呢? 或者说FlatMap是怎么从用户代码转换到Flink运行时呢 ?

    1. DataSet

    首先说说 DataSet相关这套系统中FlatMap的实现。

    请注意,DataSteam对应的那套系统中,operator名字都是带着Stream的,比如StreamOperator。

    DataSet

    val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1) 这段代码调用的就是DataSet中的API。具体如下:

    public abstract class DataSet<T> {
      
    	public <R> FlatMapOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) {
        
    		String callLocation = Utils.getCallLocationName();
        
    		TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType(), callLocation, true);
    		return new FlatMapOperator<>(this, resultType, clean(flatMapper), callLocation);
    	}
    }
    

    FlatMapOperator

    可以看出,flatMap @ DataSet 主要就是生成了一个FlatMapOperator,这个可以理解为是逻辑算子。其定义如下:

    public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, FlatMapOperator<IN, OUT>> {
    
    	protected final FlatMapFunction<IN, OUT> function;
    	protected final String defaultName;
    
    	public FlatMapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatMapFunction<IN, OUT> function, String defaultName) {
    		super(input, resultType);
    		this.function = function;
    		this.defaultName = defaultName;
    	}
    
    	@Override
    	protected FlatMapFunction<IN, OUT> getFunction() {
    		return function;
    	}
    
      // 这个translateToDataFlow就是生成计划(Plan)的关键代码
    	@Override
    	protected FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
    		String name = getName() != null ? getName() : "FlatMap at " + defaultName;
    		// create operator
    		FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> po = new FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>>(function,
    			new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
    		// set input
    		po.setInput(input);
    		// set parallelism
    		if (this.getParallelism() > 0) {
    			// use specified parallelism
    			po.setParallelism(this.getParallelism());
    		} else {
    			// if no parallelism has been specified, use parallelism of input operator to enable chaining
    			po.setParallelism(input.getParallelism());
    		}
    		return po;
    	}
    }
    

    FlatMapOperator的基类如下:

    public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOperator<IN, OUT, O>> extends SingleInputOperator<IN, OUT, O> implements UdfOperator<O> {
    
    }
    
    // Base class for operations that operates on a single input data set.
    public abstract class SingleInputOperator<IN, OUT, O extends SingleInputOperator<IN, OUT, O>> extends Operator<OUT, O> {
      	private final DataSet<IN> input;
    }
    

    生成计划

    DataSet API所编写的批处理程序跟DataStream API所编写的流处理程序在生成作业图(JobGraph)之前的实现差别很大。流处理程序是生成流图(StreamGraph),而批处理程序是生成计划(Plan)并由优化器对其进行优化并生成优化后的计划(OptimizedPlan)。

    计划(Plan)以数据流(dataflow)的形式来表示批处理程序,但它只是批处理程序最初的表示,在一个批处理程序生成作业图之前,计划还会被进行优化以产生更高效的方案。Plan不同于流图(StreamGraph),它以sink为入口,因为一个批处理程序可能存在若干个sink,所以Plan采用集合来存储它。另外Plan还封装了批处理作业的一些基本属性:jobId、jobName以及defaultParallelism等。

    生成Plan的核心部件是算子翻译器(OperatorTranslation),createProgramPlan方法通过它来”翻译“出计划,核心代码如下

    public class OperatorTranslation {
      
       // 接收每个需遍历的DataSink对象,然后将其转换成GenericDataSinkBase对象
       public Plan translateToPlan(List<DataSink<?>> sinks, String jobName) {
           List<GenericDataSinkBase<?>> planSinks = new ArrayList<>();
           //遍历sinks集合
           for (DataSink<?> sink : sinks) {
                //将翻译生成的GenericDataSinkBase加入planSinks集合*,对每个sink进行”翻译“
                planSinks.add(translate(sink));
            }
           //以planSins集合构建Plan对象
           Plan p = new Plan(planSinks);
           p.setJobName(jobName);
           return p;
        }
    
    	private <I, O> org.apache.flink.api.common.operators.Operator<O>    translateSingleInputOperator(SingleInputOperator<?, ?, ?> op) {
        //会调用到 FlatMapOperator 的 translateToDataFlow
    	org.apache.flink.api.common.operators.Operator<O> dataFlowOp = typedOp.translateToDataFlow(input);    
      }
      
    }
    

    FlatMapOperatorBase就是生成的plan中的一员。

    public class FlatMapOperatorBase<IN, OUT, FT extends FlatMapFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
    	@Override
    	protected List<OUT> executeOnCollections(List<IN> input, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
    		FlatMapFunction<IN, OUT> function = userFunction.getUserCodeObject();
    		
    		FunctionUtils.setFunctionRuntimeContext(function, ctx);
    		FunctionUtils.openFunction(function, parameters);
    
    		ArrayList<OUT> result = new ArrayList<OUT>(input.size());
    
    		TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
    		TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
    
    		CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer);
    
    		for (IN element : input) {
    			IN inCopy = inSerializer.copy(element);
    			function.flatMap(inCopy, resultCollector);
    		}
    
    		FunctionUtils.closeFunction(function);
    
    		return result;
    	}
    }
    

    而最后优化时候,则FlatMapOperatorBase会被优化成FlatMapNode。

    public class GraphCreatingVisitor implements Visitor<Operator<?>> {
    	public boolean preVisit(Operator<?> c) {
        else if (c instanceof FlatMapOperatorBase) {
    			n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c);
    		}
      }
    }
    

    自此,FlatMap就被组合到 DataSet的 OptimizedPlan 中。下一步Flink会依据OptimizedPlan来生成 JobGraph。

    作业图(JobGraph)是唯一被Flink的数据流引擎所识别的表述作业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一。至此就完成了从用户业务代码到Flink运行系统的转化。

    在运行状态下,如果上游有数据流入,则FlatMap这个算子就会发挥作用。

    2. DataStream

    对于DataStream,则是另外一套体系结构。首先我们找一个使用DataStream的例子看看。

    //获取数据: 从socket中获取
    val textDataStream = env.socketTextStream("127.0.0.1", 8888, '
    ')
    val tupDataStream = textDataStream.flatMap(_.split(" ")).map(WordWithCount(_,1))
    
    //groupby: 按照指定的字段聚合
    val windowDstram = tupDataStream.keyBy("word").timeWindow(Time.seconds(5),Time.seconds(1))
    windowDstram.sum("count").print()
    

    上面例子中,flatMap 调用的是DataStream中的API,具体如下:

    public class DataStream<T> {
      
    	public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
        //clean函数用来移除FlatMapFunction类对象的外部类部分,这样就可以进行序列化
        //getType用来获取类对象的输出类型
    		TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
    				getType(), Utils.getCallLocationName(), true);
    		return flatMap(flatMapper, outType);
    	}
      
      // 构建了一个StreamFlatMap的Operator
    	public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
    		return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
    	}  
      
      // 依次调用下去
    	@PublicEvolving
    	public <R> SingleOutputStreamOperator<R> transform(
    			String operatorName,
    			TypeInformation<R> outTypeInfo,
    			OneInputStreamOperator<T, R> operator) {
    		return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
    	}
      
    	protected <R> SingleOutputStreamOperator<R> doTransform(
    			String operatorName,
    			TypeInformation<R> outTypeInfo,
    			StreamOperatorFactory<R> operatorFactory) {
    		// read the output type of the input Transform to coax out errors about MissingTypeInfo
    		transformation.getOutputType();
        // 构建Transform对象,Transform对象中包含其上游Transform对象,这样上游下游就串成了一个Transform链。
    		OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
    				this.transformation,
    				operatorName,
    				operatorFactory,
    				outTypeInfo,
    				environment.getParallelism());
    		@SuppressWarnings({"unchecked", "rawtypes"})
    		SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
        // 将这Transform对象放入env的transform对象列表。
    		getExecutionEnvironment().addOperator(resultTransform);
        // 返回流
    		return returnStream;
    	}  
    }
    

    上面源码中的几个概念需要澄清。

    Transformation:首先,FlatMap在FLink编程模型中是算子API,在DataStream中会生成一个Transformation,即逻辑算子。

    逻辑算子Transformation最后会对应到物理算子Operator,这个概念对应的就是StreamOperator

    StreamOperator:DataStream 上的每一个 Transformation 都对应了一个 StreamOperator,StreamOperator是运行时的具体实现,会决定UDF(User-Defined Funtion)的调用方式。

    processElement()方法也是UDF的逻辑被调用的地方,例如FlatMapFunction里的flatMap()方法。

    public class StreamFlatMap<IN, OUT>
    		extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>
    		implements OneInputStreamOperator<IN, OUT> {
    
    	private transient TimestampedCollector<OUT> collector;
    
    	public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
    		super(flatMapper);
    		chainingStrategy = ChainingStrategy.ALWAYS;
    	}
    
    	@Override
    	public void open() throws Exception {
    		super.open();
    		collector = new TimestampedCollector<>(output);
    	}
    
    	@Override
    	public void processElement(StreamRecord<IN> element) throws Exception {
    		collector.setTimestamp(element);
        // 调用用户定义的FlatMap
    		userFunction.flatMap(element.getValue(), collector);
    	}
    }
    

    我们可以看到,StreamFlatMap继承了AbstractUdfStreamOperator,从而间接继承了StreamOperator。

    public abstract class AbstractStreamOperator<OUT>
    		implements StreamOperator<OUT>, SetupableStreamOperator<OUT>, Serializable {
    }
    

    StreamOperator是根接口。对于 Streaming 来说所有的算子都继承自 StreamOperator。继承了StreamOperator的扩展接口则有OneInputStreamOperator,TwoInputStreamOperator。实现了StreamOperator的抽象类有AbstractStreamOperator以及它的子类AbstractUdfStreamOperator。

    从 API 到 逻辑算子 Transformation,再到 物理算子Operator,就生成了 StreamGraph。下一步Flink会依据StreamOperator来生成 JobGraph。

    作业图(JobGraph)是唯一被Flink的数据流引擎所识别的表述作业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一。至此就完成了从用户业务代码到Flink运行系统的转化。

    0x04 参考

    Flink中richfunction的一点小作用

    【浅显易懂】scala中map与flatMap的区别

    Working with State

    flink简单应用: scala编写wordcount

    【Flink】Flink基础之实现WordCount程序(Java与Scala版本)

    Flink进阶教程:以flatMap为例,如何进行算子自定义

    Flink运行时之批处理程序生成计划

  • 相关阅读:
    MVVM
    vue-cli初始化项目2.x|3.x
    逻辑覆盖
    white box白盒测试
    black box黑盒测试
    总结回顾js arr的常见方法以及相关的使用场景(一)
    js 原生功底 (一)
    markdown 语法总结(一)
    阿里一面,面试官想看到的究竟是什么,带你揭秘!!!!
    关于Axios 源码你想了解的 在这儿
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/12594424.html
Copyright © 2011-2022 走看看