-
累加器
-
Accumulator即累加器,可以在分布式统计数据,只有在任务结束之后才能获取累加器的最终结果。
-
计数器是累加器的具体实现,有:IntCounter,LongCounter和DoubleCounter。
-
注意事项:
- 需要在算子内部创建累加器对象
- 通常在Rich函数中的open方法中注册累加器,指定累加器的名称
- 在当前算子内任意位置可以使用累加器
- 必须当任务执行结束后,通过env.execute(xxx)执行后的JobExecutionResult对象获取累加器的值。
-
代码:
package com.ronnie.flink.batch; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; public class AccumulatorTest { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 读取数据源 DataSet<String> text = env.readTextFile("data/textFile"); MapOperator<String, String> hello = text.map(new RichMapFunction<String, String>() { IntCounter intCounter = new IntCounter(); @Override public void open(Configuration parameters) throws Exception { getRuntimeContext().addAccumulator("my-accumulator", intCounter); } @Override public String map(String value) throws Exception { if (value.contains("hello")) { intCounter.add(1); } return value; } }); hello.writeAsText("data/my.txt", FileSystem.WriteMode.OVERWRITE); JobExecutionResult counter = env.execute("counter"); Integer result = counter.getAccumulatorResult("my-accumulator"); System.out.println(result); } }
-
-
广播变量
-
广播: 数据集合通过withBroadcastSet进行广播
-
访问: 可通过getRuntimeContext().getBroadcastVariable访问
-
代码:
package com.ronnie.flink.batch; import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FilterOperator; import org.apache.flink.configuration.Configuration; import java.util.ArrayList; import java.util.List; /** * 广播变量在某个算子后面调用.withBroadcastSet(whiteDs, "white-name") 方法; * 即此operator可以得到广播变量,其他算子不行。 */ public class BroadCastTest { public static void main(String[] args) throws Exception { ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); DataSource<String> textFile = environment.readTextFile("data/textfile"); List<String> list = new ArrayList<String>(); list.add("rust"); list.add("swift"); DataSource<String> whiteDs = environment.fromCollection(list); FilterOperator<String> f1 = textFile.filter(new RichFilterFunction<String>() { List<String> whileNames = null; @Override public void open(Configuration parameters) throws Exception { whileNames = getRuntimeContext().getBroadcastVariable("white-name"); } @Override public boolean filter(String value) throws Exception { for (String whileName : whileNames) { if (value.contains(whileName)) { return true; } } return false; } }); // f1 operator 算子可以得到广播变量。 FilterOperator<String> f2 = f1.withBroadcastSet(whiteDs, "while-name"); f2.print(); } }
-
-
分布式缓存
-
Flink提供了一个分布式缓存,类似于Apache Hadoop。
-
执行程序时,Flink会自动将文件或目录复制到所有Worker的本地文件系统。
-
用户函数可以查找指定名称下的文件或目录,并从worker的本地文件系统访问它。
-
代码:
package com.ronnie.flink.batch; import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.configuration.Configuration; import java.io.File; import java.util.List; public class DistributedCacheTest { public static void main(String[] args) throws Exception { ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); // 当前项目路径 String project_path = System.getProperty("user.dir"); // 可以是本地文件或hdfs文件, hdfs 文件路径则以hdfs://开头 environment.registerCachedFile("file:///" + project_path + "/data/textfile", "myfile"); DataSource<String> elements = environment.fromElements("hadoop", "flink", "spark", "hbase"); MapOperator<String, String> map = elements.map(new RichMapFunction<String, String>() { @Override public void open(Configuration parameters) throws Exception { // 在worker端获取缓存文件 File myFile = getRuntimeContext().getDistributedCache().getFile("myFile"); // 读取缓存文件 List<String> list = FileUtils.readLines(myFile); for (String line : list) { System.out.println("[" + line + "]"); } } @Override public String map(String value) throws Exception { return value; } }); map.print(); } }
-