zoukankan      html  css  js  c++  java
  • Flink-1.10中的StreamingFileSink相关特性

    一切新知识的学习,都离不开官网得相关阅读,那么StreamingFileSink的官网介绍呢?

    https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/connectors/streamfile_sink.html

    flink在被阿里收购之后,官网也有了相当多的中文文档,英文不好的同学可以直接看中文版的,不过还是建议时间充足的同学直接阅读英文文档,毕竟现在的chrome中的划词翻译很是方便了,哪里不会点哪里,慢慢的开发中常见词汇也就能看个大概了。

    1. 写出文件的状态

    img

    看这个图片应该能明白,文件会分在不同的桶中,bucket中存在不同状态的文件:

    1. In-progress :当前文件正在写入中
    2. Pending :当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态
    3. Finished :在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态

    2. 简单的字符串写出示例

    DataStreamSource<String> lines = FlinkUtil.createSocketStream("localhost", 8888);
    
            StreamExecutionEnvironment env = FlinkUtil.getEnv();
            // 设置checkpoint
            env.enableCheckpointing(TimeUnit.SECONDS.toMillis(10));
    
            OutputFileConfig config = OutputFileConfig
                    .builder()
                    .withPartPrefix("prefix")
                    .withPartSuffix(".txt")
                    .build();
    
    
            final StreamingFileSink<String> sink = StreamingFileSink
                    .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                    /**
                     * 设置桶分配政策
                     * DateTimeBucketAssigner--默认的桶分配政策,默认基于时间的分配器,每小时产生一个桶,格式如下yyyy-MM-dd--HH
                     * BasePathBucketAssigner :将所有部分文件(part file)存储在基本路径中的分配器(单个全局桶)
                     */
                    .withBucketAssigner(new DateTimeBucketAssigner<>())
                    /**
                     * 有三种滚动政策
                     *  CheckpointRollingPolicy
                     *  DefaultRollingPolicy
                     *  OnCheckpointRollingPolicy
                     */
                    .withRollingPolicy(
                            /**
                             * 滚动策略决定了写出文件的状态变化过程
                             * 1. In-progress :当前文件正在写入中
                             * 2. Pending :当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态
                             * 3. Finished :在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态
                             *
                             * 观察到的现象
                             * 1.会根据本地时间和时区,先创建桶目录
                             * 2.文件名称规则:part-<subtaskIndex>-<partFileIndex>
                             * 3.在macos中默认不显示隐藏文件,需要显示隐藏文件才能看到处于In-progress和Pending状态的文件,因为文件是按照.开头命名的
                             *
                             */
                            DefaultRollingPolicy.builder()
                                    .withRolloverInterval(TimeUnit.SECONDS.toMillis(2)) //设置滚动间隔
                                    .withInactivityInterval(TimeUnit.SECONDS.toMillis(1)) //设置不活动时间间隔
                                    .withMaxPartSize(1024 * 1024 * 1024) // 最大零件尺寸
                                    .build())
                    .withOutputFileConfig(config)
                    .build();
    
            lines.addSink(sink).setParallelism(1);
    

    3. 写出文件的滚动策略

    数据写入文件时,查看源码可以知道
    滚动策略是这么判断的:
    没有处于inProgressPart状态的文件 或者 DefaultRollingPolicy.shouldRollOnEvent成立,即打开的文件大小超过了滚动器中设置的大小
    滚动文件时,首先关闭当前处于progress的part文件,然后创建一个新的 assembleNewPartPath,并且partCounter++(计数器)

    StreamingFileSink继承自RichSinkFunction,显然之后执行一次,
    该方法中注册了一个定时器,定时器的执行时间为currentProcessingTime + bucketCheckInterval
    其中bucketCheckInterval为调用StreamingFileSink.forRowFormat()时,默认创建的,其默认值为60000,也就是一分钟

    onProcessingTime方法继承自ProcessingTimeCallback,此方法使用调度触发器的时间戳调用。
    该方法中设定了60秒的定时器,定时每60秒执行一次该方法
    该方法中会调用buckets.onProcessingTime(currentTime)
    里面判断是否需要关闭part文件,注意是关闭而不是滚动
    判断条件为:part文件不为空 并且 DefaultRollingPolicy.shouldRollOnProcessingTime条件成立
    即part文件存在,并且 (当前时间-part的创建时间 >= 滚动时间 或者 当前时间-part的最后修改时间 >= 不活跃时间)

    snapshotState和initializeState方法继承自CheckpointedFunction,用来构建快照或者恢复历史状态
    其中snapshotState方法会调用buckets.snapshotState()方法,对桶的状态进行快照处理
    将所有处理活跃状态的桶全部进行快照处理,做快照时会检查是否需要滚动,滚动条件为:
    part文件不为空 并且 DefaultRollingPolicy.shouldRollOnCheckpoint成立,即文件大小超过设定
    满足该条件时,就会关闭partFile

    notifyCheckpointComplete方法继承自CheckpointListener,用来通知检查点完成
    该方法中会调用onSuccessfulCompletionOfCheckpoint方法
    会将已经关闭的(其实是处于Pending状态的文件)part文件重命名

  • 相关阅读:
    条件判断和循环
    list 和tuple的使用
    python的五大数据类型
    简单的一个程序,猜字游戏
    redhat7 nfs的配置以及auto自动挂载
    nmcli添加网卡 并且修改设备名字 添加IP地址
    RHEL7 系统ISCSI存储环境搭建
    Java分布式锁
    24个Jvm面试题总结及答案
    最新天猫3轮面试题目:虚拟机+并发锁+Sql防注入+Zookeeper
  • 原文地址:https://www.cnblogs.com/night-xing/p/12576919.html
Copyright © 2011-2022 走看看