场景描述,前两天给flink 程序做了savepoint 记录但是我在代码中改了flink程序的一个算子 这就导致了 我重新提交之前那个savepoint 的时候报错。报错信息如下
Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint hdfs://cn-northwest-1.compute.internal:8020/tmp/tmp/savepoint-a2dd6f-695e0cd41db7. Cannot map checkpoint/savepoint state for operator cbc357ccb763df2852fee8c4fc7d55f2 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI. at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:205) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1132) at org.apache.flink.runtime.scheduler.LegacyScheduler.tryRestoreExecutionGraphFromSavepoint(LegacyScheduler.java:237) at org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:196) at org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176) at org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146) ... 10 more
这句的大概的意思就是 我不能从我保存的checkpoint的位置进行读取数据 好像要指定什么参数。然后找了一圈发现问题
允许未恢复状态启动
$ bin/flink run -s :savepointPath -n [:runArgs]
默认情况下,resume操作将尝试将保存点的所有状态映射回要恢复的程序。如果删除了运算符,则可以通过--allowNonRestoredState(short -n:)选项跳过无法映射到新程序的状态:
所以在恢复的时候,要加上一个 -n 参数 这个参数就是相当于重新check 状态。
我试了一下 这样的话 保存的数据也不会重复。这样 这个问题就得到了解决。