zoukankan      html  css  js  c++  java
  • 如何在flink中传递参数

    众所周知,flink作为流计算引擎,处理源源不断的数据是其本意,但是在处理数据的过程中,往往可能需要一些参数的传递,那么有哪些方法进行参数的传递?在什么时候使用?这里尝试进行简单的总结。

    • 使用configuration

      在main函数中定义变量

    1 // Class in Flink to store parameters
    2 Configuration configuration = new Configuration();
    3 configuration.setString("genre", "Action");
    4 
    5 lines.filter(new FilterGenreWithParameters())
    6         // Pass parameters to a function
    7         .withParameters(configuration)
    8         .print();

      使用参数的function需要继承自一个rich的function,这样才可以在open方法中获取相应的参数。

     1 class FilterGenreWithParameters extends RichFilterFunction<Tuple3<Long, String, String>> {
     2 
     3     String genre;
     4 
     5     @Override
     6     public void open(Configuration parameters) throws Exception {
     7         // Read the parameter
     8         genre = parameters.getString("genre", "");
     9     }
    10 
    11     @Override
    12     public boolean filter(Tuple3<Long, String, String> movie) throws Exception {
    13         String[] genres = movie.f2.split("\|");
    14 
    15         return Stream.of(genres).anyMatch(g -> g.equals(genre));
    16     }
    17 }
    • 使用ParameterTool

    使用configuration虽然传递了参数,但显然不够动态,每次参数改变,都涉及到程序的变更,既然main函数能够接受参数,flink自然也提供了相应的承接的机制,即ParameterTool。

    如果使用ParameterTool,则在参数传递上如下

     1 public static void main(String... args) {
     2     // Read command line arguments
     3     ParameterTool parameterTool = ParameterTool.fromArgs(args);
     4     
     5 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
     6 env.getConfig().setGlobalJobParameters(parameterTool);
     7 ...
     8 
     9 // This function will be able to read these global parameters
    10 lines.filter(new FilterGenreWithGlobalEnv())
    11                 .print();
    12 }

    如上面代码,使用parameterTool来承接main函数的参数,通过env来设置全局变量来进行分发,那么在继承了rich函数的逻辑中就可以使用这个全局参数。

     1 class FilterGenreWithGlobalEnv extends RichFilterFunction<Tuple3<Long, String, String>> {
     2 
     3     @Override
     4     public boolean filter(Tuple3<Long, String, String> movie) throws Exception {
     5         String[] genres = movie.f2.split("\|");
     6         // Get global parameters
     7         ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
     8         // Read parameter
     9         String genre = parameterTool.get("genre");
    10 
    11         return Stream.of(genres).anyMatch(g -> g.equals(genre));
    12     }
    13 }
    • 使用broadcast变量

    在上面使用configuration和parametertool进行参数传递会很方便,但是也仅仅适用于少量参数的传递,如果有比较大量的数据传递,flink则提供了另外的方式来进行,其中之一即是broadcast,这个也是在其他计算引擎中广泛使用的方法之一。

     1 DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);
     2 // Get a dataset with words to ignore
     3 DataSet<String> wordsToIgnore = ...
     4 
     5 data.map(new RichFlatMapFunction<String, String>() {
     6 
     7     // A collection to store words. This will be stored in memory
     8     // of a task manager
     9     Collection<String> wordsToIgnore;
    10 
    11     @Override
    12     public void open(Configuration parameters) throws Exception {
    13         // Read a collection of words to ignore
    14         wordsToIgnore = getRuntimeContext().getBroadcastVariable("wordsToIgnore");
    15     }
    16 
    17 
    18     @Override
    19     public String map(String line, Collector<String> out) throws Exception {
    20         String[] words = line.split("\W+");
    21         for (String word : words)
    22             // Use the collection of words to ignore
    23             if (wordsToIgnore.contains(word))
    24                 out.collect(new Tuple2<>(word, 1));
    25     }
    26     // Pass a dataset via a broadcast variable
    27 }).withBroadcastSet(wordsToIgnore, "wordsToIgnore");

    在第3行定义了需要进行广播的数据集,在第27行指定了将此数据集进行广播的目的地。

    广播的变量会保存在tm的内存中,这个也必然会使用tm有限的内存空间,也因此不能广播太大量的数据。

    那么,对于数据量更大的广播需要,要如何进行?flink也提供了缓存文件的机制,如下。

    • 使用distributedCache

    首先还是需要在定义dag图的时候指定缓存文件:

    1 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    2 
    3 // Register a file from HDFS
    4 env.registerCachedFile("hdfs:///path/to/file", "machineLearningModel")
    5 
    6 ...
    7 
    8 env.execute()

    flink本身支持指定本地的缓存文件,但一般而言,建议指定分布式存储比如hdfs上的文件,并为其指定一个名称。

    使用起来也很简单,在rich函数的open方法中进行获取。

     1 class MyClassifier extends RichMapFunction<String, Integer> {
     2 
     3     @Override
     4     public void open(Configuration config) {
     5       File machineLearningModel = getRuntimeContext().getDistributedCache().getFile("machineLearningModel");
     6       ...
     7     }
     8 
     9     @Override
    10     public Integer map(String value) throws Exception {
    11       ...
    12     }
    13 }

    上面的代码忽略了对文件内容的处理。

    在上面的几个方法中,应该说参数本身都是static的,不会变化,那么如果参数本身随着时间也会发生变化,怎么办?

    嗯,那就用connectStream,其实也是流的聚合了。

    • 使用connectStream

    使用ConnectedStream的前提当然是需要有一个动态的流,比如在主数据之外,还有一些规则数据,这些规则数据会通过Restful服务来发布,假如我们的主数据来自于kafka,

    那么,就可以如下:

    1 DataStreamSource<String> input = (DataStreamSource) KafkaStreamFactory
    2                 .getKafka08Stream(env, srcCluster, srcTopic, srcGroup);
    3 
    4 DataStream<Tuple2<String, String>> appkeyMeta = env.addSource(new AppKeySourceFunction(), "appkey")
    5 
    6 ConnectedStreams<String, Tuple2<String, String>> connectedStreams = input.connect(appkeyMeta.broadcast());
    7 
    8 DataStream<String> cleanData = connectedStreams.flatMap(new DataCleanFlatMapFunction())

    其实可以看到,上面的代码中做了四件事,首先在第1行定义了获取主数据的流,在第4行定义了获取规则数据的流,在AppKeySourceFunction中实现了读取Restful的逻辑,

    在第6行实现了将规则数据广播到主数据中去,最后在第8行实现了从connectedStream中得到经过处理的数据。其中的关键即在于DataCleanFlatMapFunction。

    1 public class DataCleanFlatMapFunction extends RichCoFlatMapFunction<String, Tuple2<String, String>, String>{
    2 
    3 public void flatMap1(String s, Collector<String> collector){...}
    4 
    5 public void flatMap2(Tuple2<String, String> s, Collector<String> collector) {...}
    6 
    7 
    8 }

    这是一段缩减的代码,关键在于第一行,首先这个函数需要实现RichCoFlatMapFunction这个抽象类,其次在类实现中,flatMap2会承接规则函数,flatMap1会承接主函数。

    当然,参数可以从client发送到task,有时候也需要从task发回到client,一般这里就会使用accumulator。

    这里先看一个简单的例子,实现单词的计数以及处理文本的记录数:

     1 DataSet<String> lines = ...
     2 
     3 // Word count algorithm
     4 lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
     5     @Override
     6     public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
     7         String[] words = line.split("\W+");
     8         for (String word : words) {
     9             out.collect(new Tuple2<>(word, 1));
    10         }
    11     }
    12 })
    13 .groupBy(0)
    14 .sum(1)
    15 .print();
    16 
    17 // Count a number of lines in the text to process
    18 int linesCount = lines.count()
    19 System.out.println(linesCount);

    上面的代码中,第14行实现了单词的计算,第18行实现了处理记录的行数,但很可惜,这里会产生两个job,仅仅第18行一句代码,就会产生一个job,无疑是不高效的。

    flink提供了accumulator来实现数据的回传,亦即从tm传回到JM。

    flink本身提供了一些内置的accumulator:

    • IntCounterLongCounterDoubleCounter – allows summing together int, long, double values sent from task managers
    • AverageAccumulator – calculates an average of double values
    • LongMaximumLongMinimumIntMaximumIntMinimumDoubleMaximumDoubleMinimum – accumulators to determine maximum and minimum values for different types
    • Histogram – used to computed distribution of values from task managers

    首先需要定义一个accumulator,然后在某个自定义函数中来注册它,这样在客户端就可以获取相应的的值。

     1 lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {
     2 
     3     // Create an accumulator
     4     private IntCounter linesNum = new IntCounter();
     5 
     6     @Override
     7     public void open(Configuration parameters) throws Exception {
     8         // Register accumulator
     9         getRuntimeContext().addAccumulator("linesNum", linesNum);
    10     }
    11 
    12     @Override
    13     public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
    14         String[] words = line.split("\W+");
    15         for (String word : words) {
    16             out.collect(new Tuple2<>(word, 1));
    17         }
    18         
    19         // Increment after each line is processed
    20         linesNum.add(1);
    21     }
    22 })
    23 .groupBy(0)
    24 .sum(1)
    25 .print();
    26 
    27 // Get accumulator result
    28 int linesNum = env.getLastJobExecutionResult().getAccumulatorResult("linesNum");
    29 System.out.println(linesNum);

    当然,如果内置的accumulator不能满足需求,可以自定义accumulator,只需要继承两个接口之一即可,Accumulator或者SimpleAccumulato。

    上面介绍了几种参数传递的方式,在日常的使用中,可能不仅仅是使用其中一种,或许是某些的组合,比如通过parametertool来传递hdfs的路径,再通过filecache来读取缓存。

  • 相关阅读:
    这一年来,我的初三
    LGOJP4381 [IOI2008]Island
    BZOJ4484: [Jsoi2015]最小表示
    二分图染色及最大匹配(匈牙利算法)略解
    2019牛客多校第三场 F.Planting Trees
    性能优化 | 30个Java性能优化技巧,你会吗?
    进程 | 线程 | 当Linux多线程遭遇Linux多进程
    性能面试 | 性能测试常见面试题
    性能调优 | 如何通过性能调优突破 MySQL 数据库性能瓶颈?
    性能分析 | Java服务器内存过高&CPU过高问题排查
  • 原文地址:https://www.cnblogs.com/029zz010buct/p/10362451.html
Copyright © 2011-2022 走看看