zoukankan      html  css  js  c++  java
  • Flink State 管理与恢复 之savePoint

    Savepoints 是检查点的一种特殊实现,底层实现其实也是使用 Checkpoints 的机制。
    Savepoints 是用户以手工命令的方式触发 Checkpoint,并将结果持久化到指定的存储路径中,其主要目的是帮助用户在升级和维护集群过程中保存系统中的状态数据,避免因为停机运维或者升级应用等正常终止应用的操作而导致系统无法恢复到原有的计算状态的情况,从而无法实现从端到端的 Excatly-Once 语义保证。

    1) 配置 Savepoints 的存储路径

    在 flink-conf.yaml 中配置 SavePoint 存储的位置,设置后,如果要创建指定 Job 的SavePoint,可以不用在手动执行命令时指定 SavePoint 的位置。

    state.savepoints.dir: hdfs:/node2/savepoints

    2) 在代码中设置算子 ID

    为了能够在作业的不同版本之间以及 Flink 的不同版本之间顺利升级,强烈推荐程序员通过手动给算子赋予 ID,这些 ID 将用于确定每一个算子的状态范围。如果不手动给各算子指定 ID,则会由 Flink 自动给每个算子生成一个 ID。而这些自动生成的 ID 依赖于程序的结构,并且对代码的更改是很敏感的。因此,强烈建议用户手动设置 ID。

    import org.apache.flink.api.scala.createTypeInformation
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    
    object TestSaveByHdfs {
      def main(args: Array[String]): Unit = {
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        // 默认并行度给所有算子适用,并行度<= slot数量
        env.setParallelism(1)
    
        // 3. 读取数据 sock流中的数据
        // DataStream 相当于spark中的Dstream
        val stream: DataStream[String] = env.socketTextStream("node1", 8888)
          .uid("stream-001")
        // 4. 转换和处理数据
        val result: DataStream[(String, Int)] = stream.flatMap(_.split(" "))
          .uid("flatMap-001")
          .map((_, 1)).uid("map-001")
          .keyBy(0) // 分组算子,0或者1 代表下标,0代表单词,1代表单词出现的次数
          .sum(1).uid("sum-001") // 聚会累加算子
        result.print("结果")
        env.execute("wordcount")
      }
    }

    3) 触发 SavePoint

    //先启动Job
    [root@node1 bin]# ./flink run -c com.it.flink.state.TestSavepoints -d /home/Flink-Demo-1.0-SNAPSHOT.jar
    //再取消Job ,触发SavePoint
    [root@node1 bin]# ./flink savepoint 6ecb8cfda5a5200016ca6b01260b94ce
    [root@node1 bin]# ./flink cancel 6ecb8cfda5a5200016ca6b01260b94ce

    4) 从 SavePoint 启动 Job

    [root@hadoop101 bin]# ./flink run -s hdfs://node2/savepoints/savepoint-6ecb8c-e56ccb88576a -c com.it.flink.state.TestSavepoints -d /home/Flink-Demo-1.0-SNAPSHOT.jar

    也可以通过Flink webUI启动job

  • 相关阅读:
    解决ftp的pasv模式下iptables设置问题
    linux修改运行中的脚本
    shell脚本——列出质数
    转载:tomcat设置https的两种方式
    Centos缺少ifconfig命令
    转载:MySQL 数据库设计总结
    转载:HTML5视频推流方案
    转载:Linux五种方案快速恢复你的系统
    转载:HT可视化案例
    转载:21种JavaScript设计模式最新记录(含图和示例)
  • 原文地址:https://www.cnblogs.com/yj2434/p/14057893.html
Copyright © 2011-2022 走看看