今天看到有小伙伴在问,就想着自己实现一下。
问题: Flink FileSink根据输入数据指定输出位置,比如讲对应日期的数据输出到对应目录
输入数据:
20190716 输出到路径 20190716
20190717 输出到路径 20190717
20190718 输出到路径 20190718
目前flink 对与输出到文件有两种实现(write 算子不算,只能指定目录):Rolling File Sink 和 Streaming File Sink,
Rolling File Sink 的实现就是 BucketingSink,使用也很简单,直接指定路径就可以了,
也可以设置如:目录名称格式(按时间格式滚动),输出文件格式,文件大小、滚动间隔、文件前缀、后缀一类的
// the SequenceFileWriter only works with Flink Tuples import org.apache.flink.api.java.tuple.Tuple2 val input: DataStream[Tuple2[A, B]] = ... val sink = new BucketingSink[Tuple2[IntWritable, Text]]("/base/path") sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles"))) sink.setWriter(new SequenceFileWriter[IntWritable, Text]) sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB, sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins input.addSink(sink)
当然,如果是这么简单,就不会有这篇博客了,下面进入主题
--------------------------------------
默认的 DateTimeBucketer 只能根据时间指定文件名的滚动是规则,没办法根据数据指定文件的输出位置,这需要实现 BasePathBucketer 自定义输出路径
实现如下:
import java.io.File import org.apache.flink.streaming.connectors.fs.Clock import org.apache.flink.streaming.connectors.fs.bucketing.BasePathBucketer import org.apache.hadoop.fs.Path /** * 根据实际数据返回数据输出的路径 */ class DayBasePathBucketer extends BasePathBucketer[String]{ /** * 返回路径 * @param clock * @param basePath * @param element * @return */ override def getBucketPath(clock: Clock, basePath: Path, element: String): Path = { // yyyyMMdd val day = element.substring(1, 9) new Path(basePath + File.separator + day) } }
调用如下:
import java.io.File import java.text.SimpleDateFormat import com.venn.index.conf.Common import org.apache.flink.formats.json.JsonNodeDeserializationSchema import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.fs.StringWriter import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.scala._ /** * 使用BucketingSink 实现 根据‘数据’自定义输出目录 */ object RollingFileSinkDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sdf = new SimpleDateFormat("yyyyMMddHHmmss") val source = new FlinkKafkaConsumer[ObjectNode]("roll_file_sink", new JsonNodeDeserializationSchema, Common.getProp) val sink = new BucketingSink[String]("D:\idea_out\rollfilesink") sink.setBucketer(new DayBasePathBucketer) sink.setWriter(new StringWriter[String]) sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB, // sink.setBatchRolloverInterval(24 * 60 * 60 * 1000) // this is 24 hour
// sink.setInProgressPrefix("inProcessPre")
// sink.setPendingPrefix("pendingpre")
// sink.setPartPrefix("partPre")
env.addSource(source) .assignAscendingTimestamps(json => { sdf.parse(json.get("date").asText()).getTime }) .map(json => { json.get("date") + "-" + json.toString // 将日期拼接到前面,方便后面使用 }) .addSink(sink) env.execute("rollingFileSink") } }
测试数据如下:
{"id" : 1, "name" : "venn1563288621091", "date" : "20190716230020"} {"id" : 2, "name" : "venn1563288621310", "date" : "20190716231020"} ... {"id" : 263, "name" : "venn1563288648926", "date" : "20190718184020"} {"id" : 264, "name" : "venn1563288649029", "date" : "20190718185020"} {"id" : 265, "name" : "venn1563288649132", "date" : "20190718190020"}
测试结果如下:
可以看到,当天的数据都输出到当天对应的目录中。
遇到个问题:
这里有个问题,因为重写了BasePathBucketer,自定义了输出文件,所有会同时打开多个输出文件,带来文件刷新的问题,在当前文件写完后(这里的表现是:当天的数据以及全部流过,
下一天的文件以及开始写了),会发现当天的文件中的数据不全,因为数据还没有全部刷到文件,这个时候下一个文件又开始写了,会发现上一个文件还没刷完
猜想:
猜想:每个文件都有个输出缓冲,上一个文件最后一点数据还在缓冲区,下一个文件又使用新的缓冲区,没办法刷到上一个文件的数据,只有等缓冲区数据满、超时一类的操作触发刷写 ??
验证:
源码BucketingSink.closePartFilesByTime 默认每60秒或大于滚动时间间隔(batchRolloverInterval)(系统时间) 将当前park文件,将状态从 in-process 修改为 pending,随后关闭当前的part 文件,数据刷到磁盘
代码如下:
private void closePartFilesByTime(long currentProcessingTime) throws Exception { synchronized (state.bucketStates) { for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) { if ((entry.getValue().lastWrittenToTime < currentProcessingTime - inactiveBucketThreshold) || (entry.getValue().creationTime < currentProcessingTime - batchRolloverInterval)) { LOG.debug("BucketingSink {} closing bucket due to inactivity of over {} ms.", getRuntimeContext().getIndexOfThisSubtask(), inactiveBucketThreshold); closeCurrentPartFile(entry.getValue()); } } } }
下篇: Flink FileSink 自定义输出路径——StreamingFileSink、BucketingSink 和 StreamingFileSink简单比较
搞定