zoukankan      html  css  js  c++  java
  • Flink FileSink 自定义输出路径——StreamingFileSink、BucketingSink 和 StreamingFileSink简单比较

    接上篇:Flink FileSink 自定义输出路径——BucketingSink 

    上篇使用BucketingSink 实现了自定义输出路径,现在来看看 StreamingFileSink( 据说是StreamingFileSink 是社区优化后添加的connector,推荐使用)

    StreamingFileSink 实现起来会稍微麻烦一点(也是灵活,功能更强大),因为可以自己实现序列化方法(源码里面有实例可以参考-复制)

    StreamingFileSink 有两个方法可以输出到文件  forRowFormat 和  forBulkFormat,名字差不多代表的方法的含义:行编码格式块编码格式

    forRowFormat 比较简单,只提供了 SimpleStringEncoder 写文本文件,可以指定编码,如下:

    import org.apache.flink.api.common.serialization.SimpleStringEncoder
    import org.apache.flink.core.fs.Path
    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
    
    val input: DataStream[String] = ...
    
    val sink: StreamingFileSink[String] = StreamingFileSink
        .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8")) // 所有数据都写到同一个路径
        .build()
        
    input.addSink(sink)

    当然我们的主题还是根据输入数据自定义文件输出路径,就需要重写 DayBucketAssigner,如下:

    import java.io.IOException
    import java.nio.charset.StandardCharsets
    import org.apache.flink.core.io.SimpleVersionedSerializer
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
    import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner
    
    class DayBucketAssigner extends BucketAssigner[ObjectNode, String] {
    
      /**
        * bucketId is the output path
        * @param element
        * @param context
        * @return
        */
      override def getBucketId(element: ObjectNode, context: BucketAssigner.Context): String = {
        //context.currentProcessingTime()
        val day = element.get("date").asText("19790101000000").substring(0, 8)
        // wrap can use day + "/" + xxx
        day
      }
    
      override def getSerializer: SimpleVersionedSerializer[String] = {
    
        StringSerializer
      }
    
      /**
        * 实现参考 : org.apache.flink.runtime.checkpoint.StringSerializer
        */
      object StringSerializer extends SimpleVersionedSerializer[String] {
        val VERSION = 77
    
        override def getVersion = 77
    
        @throws[IOException]
        override def serialize(checkpointData: String): Array[Byte] = checkpointData.getBytes(StandardCharsets.UTF_8)
    
        @throws[IOException]
        override def deserialize(version: Int, serialized: Array[Byte]): String = if (version != 77) throw new IOException("version mismatch")
        else new String(serialized, StandardCharsets.UTF_8)
      }
    }

    在初始化sink 的时候,指定 BucketAssigner 就可以了

    val sinkRow = StreamingFileSink
          .forRowFormat(new Path("D:\idea_out\rollfilesink"), new SimpleStringEncoder[ObjectNode]("UTF-8"))
          .withBucketAssigner(new DayBucketAssigner)
         // .withBucketCheckInterval(60 * 60 * 1000l) // 1 hour
          .build()

    执行结果如下:

    2、 forBulkFormat 和forRowFormat 不太一样,需要自己实现 BulkWriterFactory 和  DayBulkWriter,自定义程度高,可以实现自己的  FSDataOutputStream,写出各种格式的文件(forRowFormat 自定义Encoder  也可以,但是如 forBuckFormat 灵活)

    // use define BulkWriterFactory and DayBucketAssinger
        val sinkBuck = StreamingFileSink
          .forBulkFormat(new Path("D:\idea_out\rollfilesink"), new DayBulkWriterFactory)
          .withBucketAssigner(new DayBucketAssigner())
          .build()

    实现如下:

    import java.io.File
    import java.nio.charset.StandardCharsets
    import org.apache.flink.api.common.serialization.BulkWriter
    import org.apache.flink.core.fs.FSDataOutputStream
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
    import org.apache.flink.util.Preconditions
    
    /**
      * 实现参考 : org.apache.flink.streaming.api.functions.sink.filesystem.BulkWriterTest
      */
    class DayBulkWriter extends BulkWriter[ObjectNode] {
    
      val charset = StandardCharsets.UTF_8
      var stream: FSDataOutputStream = _
    
      def DayBulkWriter(inputStream: FSDataOutputStream): DayBulkWriter = {
        stream = Preconditions.checkNotNull(inputStream);
        this
      }
    
      /**
        * write element
        *
        * @param element
        */
      override def addElement(element: ObjectNode): Unit = {
        this.stream.write(element.toString.getBytes(charset))
        // wrapthis.stream.write('
    ')
      }
    
      override def flush(): Unit = {
        this.stream.flush()
      }
    
      /**
        * output stream is input parameter, just flush, close is factory's job
        */
      override def finish(): Unit = {
        this.flush()
      }
    
    }
    
    /**
      * 实现参考 : org.apache.flink.streaming.api.functions.sink.filesystem.BulkWriterTest.TestBulkWriterFactory
      */
    class DayBulkWriterFactory extends BulkWriter.Factory[ObjectNode] {
      override def create(out: FSDataOutputStream): BulkWriter[ObjectNode] = {
        val dayBulkWriter = new DayBulkWriter
        dayBulkWriter.DayBulkWriter(out)
    
      }
    }

    执行的结果就不赘述了

    又遇到个问题,StreamFileSink 没办法指定输出文件的名字。

    BucketingSink 和 StreamingFileSink 的不同

    从源码位置来说:

    BucketingSink 在 connector 下面,注重输出数据
    StreamingFileSink 在api 下面,注重与三方交互

    从版本来说:

    BucketingSink 比较早就有了
    StreamingFileSink 是1.6版本推出的功能(据说是优化后推出的)

    从支持的文件系统来说:

    BucketingSink     支持Hadoop 文件系统支持的所有文件系统(原文:This connector provides a Sink that writes partitioned files to any filesystem supported by Hadoop FileSystem)
    StreamingFileSink 支持Flink FileSystem 抽象文件系统   (原文:This connector provides a Sink that writes partitioned files to filesystems supported by the Flink FileSystem abstraction)

    从写数据的方式来说:

    BucketingSink     默认的Writer是StringWriter,也提供SequenceFileWriter(字符)
    StreamingFileSink 使用 OutputStream +  Encoder 对外写数据 (字节)

    从文件滚动策略来说:

    BucketingSink     提供了时间、条数滚动 
    StreamingFileSink 默认提供时间(官网有说条数,没看到 This is also configurable but the default policy rolls files based on file size and a timeout,自己实现BulkWriter可以)

    从目前(1.7.2)来说,BucketingSink 更开箱即用(功能相对简单),StreamingFileSink更麻烦(更灵活、强大)

    只是个初学者,还不太能理解 BucketingSink 和 StreamingFileSink 的差异,等了解之后,再来完善

    结论:比较推荐使用StreamingFileSink

    理由:功能强大,数据刷新时间更快(没有,BucketingSink默认60S的问题,详情见上篇,最后一段)

  • 相关阅读:
    关于unittest框架的传参问题
    爬虫的框架:Scarpy
    Robot Frameworke在python3上搭建环境以及快捷方式的创建
    安装第三方模块报错:read time out
    操作正则表达式遇到的问题
    gil锁 线程队列 线程池
    并发编程
    网络编程传输文件
    粘包现象
    UDP协议下的socket
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/11198196.html
Copyright © 2011-2022 走看看