zoukankan      html  css  js  c++  java
  • Flink State 管理与恢复 之savePoint

    Savepoints 是检查点的一种特殊实现,底层实现其实也是使用 Checkpoints 的机制。
    Savepoints 是用户以手工命令的方式触发 Checkpoint,并将结果持久化到指定的存储路径中,其主要目的是帮助用户在升级和维护集群过程中保存系统中的状态数据,避免因为停机运维或者升级应用等正常终止应用的操作而导致系统无法恢复到原有的计算状态的情况,从而无法实现从端到端的 Excatly-Once 语义保证。

    1) 配置 Savepoints 的存储路径

    在 flink-conf.yaml 中配置 SavePoint 存储的位置,设置后,如果要创建指定 Job 的SavePoint,可以不用在手动执行命令时指定 SavePoint 的位置。

    state.savepoints.dir: hdfs:/node2/savepoints

    2) 在代码中设置算子 ID

    为了能够在作业的不同版本之间以及 Flink 的不同版本之间顺利升级,强烈推荐程序员通过手动给算子赋予 ID,这些 ID 将用于确定每一个算子的状态范围。如果不手动给各算子指定 ID,则会由 Flink 自动给每个算子生成一个 ID。而这些自动生成的 ID 依赖于程序的结构,并且对代码的更改是很敏感的。因此,强烈建议用户手动设置 ID。

    import org.apache.flink.api.scala.createTypeInformation
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    
    object TestSaveByHdfs {
      def main(args: Array[String]): Unit = {
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        // 默认并行度给所有算子适用,并行度<= slot数量
        env.setParallelism(1)
    
        // 3. 读取数据 sock流中的数据
        // DataStream 相当于spark中的Dstream
        val stream: DataStream[String] = env.socketTextStream("node1", 8888)
          .uid("stream-001")
        // 4. 转换和处理数据
        val result: DataStream[(String, Int)] = stream.flatMap(_.split(" "))
          .uid("flatMap-001")
          .map((_, 1)).uid("map-001")
          .keyBy(0) // 分组算子,0或者1 代表下标,0代表单词,1代表单词出现的次数
          .sum(1).uid("sum-001") // 聚会累加算子
        result.print("结果")
        env.execute("wordcount")
      }
    }

    3) 触发 SavePoint

    //先启动Job
    [root@node1 bin]# ./flink run -c com.it.flink.state.TestSavepoints -d /home/Flink-Demo-1.0-SNAPSHOT.jar
    //再取消Job ,触发SavePoint
    [root@node1 bin]# ./flink savepoint 6ecb8cfda5a5200016ca6b01260b94ce
    [root@node1 bin]# ./flink cancel 6ecb8cfda5a5200016ca6b01260b94ce

    4) 从 SavePoint 启动 Job

    [root@hadoop101 bin]# ./flink run -s hdfs://node2/savepoints/savepoint-6ecb8c-e56ccb88576a -c com.it.flink.state.TestSavepoints -d /home/Flink-Demo-1.0-SNAPSHOT.jar

    也可以通过Flink webUI启动job

  • 相关阅读:
    React 创建一个自动跟新时间的组件
    React 组件传值 父传递儿子
    React 以两种形式去创建组件 类或者函数(二)
    React 语法基础(一)之表达式和jsx
    ref的使用
    使用scale等比例缩放图片
    Vue动态加载图片图片不显示
    div里面的元素在【垂直 方向】上水平分布 使用calc()函数动态计算
    控制label标签的宽度,不让它换行 label标签左对齐
    表单验证
  • 原文地址:https://www.cnblogs.com/yj2434/p/14057893.html
Copyright © 2011-2022 走看看