zoukankan      html  css  js  c++  java
  • flink checkpoint 在 window 操作下 全局配置失效的问题

    背景

    • flink 版本号 1.6.2
    • flink 集群模式 flink on yarn
    • 使用flink 读取kafka 数据 简单处理之后使用自定义richWindowFunction 处理数据的时候出现异常报错:
    AsynchronousException{java.lang.Exception: Could not materialize checkpoint 20 for operator Window(TumblingProcessingTimeWindows(5), ProcessingTimeTrigger, MyRichRedisWindowFuntion) (1/8).}
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.Exception: Could not materialize checkpoint 20 for operator Window(TumblingProcessingTimeWindows(5), ProcessingTimeTrigger, MyRichRedisWindowFuntion) (1/8).
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
        ... 6 more
    Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=5249873 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
        at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
        ... 5 more
    Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=5249873 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
        at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.checkSize(MemCheckpointStreamFactory.java:64)
        at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetBytes(MemCheckpointStreamFactory.java:145)
        at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetHandle(MemCheckpointStreamFactory.java:126)
        at org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:77)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:826)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:759)
        at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
        ... 7 more
    • flink 关于 checkpoint 配置 :
    fs.default-scheme: hdfs://@hadoop:9000/
    fs.hdfs.hadoopconf: hdfs:///flink/data/
    state.checkpoints.dir: hdfs:///flink/checkpoints/
    state.checkpoints.num-retained: 20
    state.savepoints.dir: hdfs:///flink/flink-savepoints/
    state.backend.fs.checkpoint.dir: hdfs:///flink/state/checkpoints/

    疑惑点:

    全局设置 checkpoint 保存地址 ,那么window 操作的保存地址 应该也是该位置 .
    但是为什么还是会将checkpoint 使用memory 方式?

    尝试解决办法:

    在代码层设置 checkpoint保存模式:

    env.setStateBackend(new
    FsStateBackend("hdfs:///flink/checkpoints/workFlowCheckpoint"));

    解决前后对比 :

    解决后hdfs 目录:

    image

    再次疑虑:

    但是在1.6.2 版本 该类没设置为Deprecated ,求问 :
    我这个解决办法是有什么不准确的方式么? 还是说 全局设置checkpoint 对于window 自身并没有生效?

  • 相关阅读:
    Leetcode 191.位1的个数 By Python
    反向传播的推导
    Leetcode 268.缺失数字 By Python
    Leetcode 326.3的幂 By Python
    Leetcode 28.实现strStr() By Python
    Leetcode 7.反转整数 By Python
    Leetcode 125.验证回文串 By Python
    Leetcode 1.两数之和 By Python
    Hdoj 1008.Elevator 题解
    TZOJ 车辆拥挤相互往里走
  • 原文地址:https://www.cnblogs.com/huanghanyu/p/13271911.html
Copyright © 2011-2022 走看看