zoukankan      html  css  js  c++  java
  • java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_1566481621886_4397244_01_000004 timed out.

    根据Heartbeat of TaskManager with id和The heartbeat of ResourceManager with id在源码中找出这样的代码

    private class TaskManagerHeartbeatListener implements HeartbeatListener<AccumulatorReport, Void> {
    
            private final JobMasterGateway jobMasterGateway;
    
            private TaskManagerHeartbeatListener(JobMasterGateway jobMasterGateway) {
                this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
            }
    
            @Override
            public void notifyHeartbeatTimeout(ResourceID resourceID) {
                jobMasterGateway.disconnectTaskManager(
                    resourceID,
                    new TimeoutException("Heartbeat of TaskManager with id " + resourceID + " timed out."));
            }
    
            @Override
            public void reportPayload(ResourceID resourceID, AccumulatorReport payload) {
                for (AccumulatorSnapshot snapshot : payload.getAccumulatorSnapshots()) {
                    schedulerNG.updateAccumulators(snapshot);
                }
            }
    
            @Override
            public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
                return CompletableFuture.completedFuture(null);
            }
        }
    
        private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
    
            @Override
            public void notifyHeartbeatTimeout(final ResourceID resourceId) {
                runAsync(() -> {
                    log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
    
                    if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceId)) {
                        reconnectToResourceManager(
                            new JobMasterException(
                                String.format("The heartbeat of ResourceManager with id %s timed out.", resourceId)));
                    }
                });
            }
    
            @Override
            public void reportPayload(ResourceID resourceID, Void payload) {
                // nothing to do since the payload is of type Void
            }
    
            @Override
            public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
                return CompletableFuture.completedFuture(null);
            }
        }

    然后在这实例化

    this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(resourceId,new TaskManagerHeartbeatListener(selfGateway),rpcService.getScheduledExecutor(),log);

    顺着去heartbeatServices瞅瞅了

    /**
     * HeartbeatServices gives access to all services needed for heartbeating. This includes the
     * creation of heartbeat receivers and heartbeat senders.
     */
    public class HeartbeatServices {
    
        /** Heartbeat interval for the created services. */
        protected final long heartbeatInterval;
    
        /** Heartbeat timeout for the created services. */
        protected final long heartbeatTimeout;
    
        public HeartbeatServices(long heartbeatInterval, long heartbeatTimeout) {
            Preconditions.checkArgument(0L < heartbeatInterval, "The heartbeat interval must be larger than 0.");
            Preconditions.checkArgument(heartbeatInterval <= heartbeatTimeout, "The heartbeat timeout should be larger or equal than the heartbeat interval.");
    
            this.heartbeatInterval = heartbeatInterval;
            this.heartbeatTimeout = heartbeatTimeout;
        }
    
        /**
         * Creates a heartbeat manager which does not actively send heartbeats.
         *
         * @param resourceId Resource Id which identifies the owner of the heartbeat manager
         * @param heartbeatListener Listener which will be notified upon heartbeat timeouts for registered
         *                          targets
         * @param scheduledExecutor Scheduled executor to be used for scheduling heartbeat timeouts
         * @param log Logger to be used for the logging
         * @param <I> Type of the incoming payload
         * @param <O> Type of the outgoing payload
         * @return A new HeartbeatManager instance
         */
        public <I, O> HeartbeatManager<I, O> createHeartbeatManager(
            ResourceID resourceId,
            HeartbeatListener<I, O> heartbeatListener,
            ScheduledExecutor scheduledExecutor,
            Logger log) {
    
            return new HeartbeatManagerImpl<>(
                heartbeatTimeout,
                resourceId,
                heartbeatListener,
                scheduledExecutor,
                scheduledExecutor,
                log);
        }
    
        /**
         * Creates a heartbeat manager which actively sends heartbeats to monitoring targets.
         *
         * @param resourceId Resource Id which identifies the owner of the heartbeat manager
         * @param heartbeatListener Listener which will be notified upon heartbeat timeouts for registered
         *                          targets
         * @param scheduledExecutor Scheduled executor to be used for scheduling heartbeat timeouts
         * @param log Logger to be used for the logging
         * @param <I> Type of the incoming payload
         * @param <O> Type of the outgoing payload
         * @return A new HeartbeatManager instance which actively sends heartbeats
         */
        public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
            ResourceID resourceId,
            HeartbeatListener<I, O> heartbeatListener,
            ScheduledExecutor scheduledExecutor,
            Logger log) {
    
            return new HeartbeatManagerSenderImpl<>(
                heartbeatInterval,
                heartbeatTimeout,
                resourceId,
                heartbeatListener,
                scheduledExecutor,
                scheduledExecutor,
                log);
        }
    
        /**
         * Creates an HeartbeatServices instance from a {@link Configuration}.
         *
         * @param configuration Configuration to be used for the HeartbeatServices creation
         * @return An HeartbeatServices instance created from the given configuration
         */
        public static HeartbeatServices fromConfiguration(Configuration configuration) {
            long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);
    
            long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT);
    
            return new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
        }
    }

    没错超时时间就在HeartbeatManagerOptions.HEARTBEAT_TIMEOUT

        /** Timeout for requesting and receiving heartbeat for both sender and receiver sides. */
        public static final ConfigOption<Long> HEARTBEAT_TIMEOUT =
                key("heartbeat.timeout")
                .defaultValue(50000L)
                .withDescription("Timeout for requesting and receiving heartbeat for both sender and receiver sides.");

    引起心跳超时有可能是yarn压力比较大引起的,先暂时在conf/flink-conf.yaml将这个值调大一点,再观察。

    #Timeout for requesting and receiving heartbeat for both sender and receiver sides.
    heartbeat.timeout: 180000
    欢迎关注微信公众号:大数据从业者
  • 相关阅读:
    力扣 503 :下一个更大元素 II
    力扣 684 :冗余连接
    时间空间复杂度
    并查集
    UE4解决贴花拉伸的简单办法
    《程序员修炼之道Ⅱ》读书简记
    mac中安装支持m1的软件,提示已损坏问题解决
    在idea中构建gradle项目报错Command line is too long
    向上向下取整
    web端开发工具下载地址
  • 原文地址:https://www.cnblogs.com/felixzh/p/14891620.html
Copyright © 2011-2022 走看看