zoukankan      html  css  js  c++  java
  • kafka-connect-hdfs重启,进去RECOVERY状态,从hadoop hdfs拿租约,很正常,但是也太久了吧

    虽说这个算是正常现象,等的时间也太久了吧。分钟级了。这个RECOVERY里面的WAL有点多余。有这么久的时间,早从新读取kafka写入hdfs了。纯属个人见解。

    @SuppressWarnings("fallthrough")
        public boolean recover() {
            try {
                switch (state) {
                case RECOVERY_STARTED:
                    log.info("Started recovery for topic partition {}", tp);
                    pause();
                    nextState();
                case RECOVERY_PARTITION_PAUSED:
                    applyWAL();
                    nextState();
                case WAL_APPLIED:
                    truncateWAL();
                    nextState();
                case WAL_TRUNCATED:
                    resetOffsets();
                    nextState();
                case OFFSET_RESET:
                    resume();
                    nextState();
                    log.info("Finished recovery for topic partition {}", tp);
                    break;
                default:
                    log.error("{} is not a valid state to perform recovery for topic partition {}.", state, tp);
                }
            } catch (ConnectException e) {
                log.error("Recovery failed at state {}", state, e);
                setRetryTimeout(timeoutMs);
                return false;
            }
            return true;
        }
    2017-08-18 01:35:53,716 ERROR [io.confluent.connect.hdfs.TopicPartitionWriter][215] - <Recovery failed at state RECOVERY_PARTITION_PAUSED>
    org.apache.kafka.connect.errors.ConnectException: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.RecoveryInProgressException): Failed to close file ******/log. Lease recovery is in progress. Try again later.
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:3113)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2905)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:3189)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:3153)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:612)
        at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.append(AuthorizationProviderProxyClientProtocol.java:125)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:414)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080)
    
        at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:88)
        at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:105)
        at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:550)
        at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:198)
        at io.confluent.connect.hdfs.DataWriter.recover(DataWriter.java:247)
        at io.confluent.connect.hdfs.DataWriter.open(DataWriter.java:289)
        at io.confluent.connect.hdfs.HdfsSinkTask.open(HdfsSinkTask.java:118)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:428)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1000(WorkerSinkTask.java:54)
        at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:464)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:234)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$2.onSuccess(AbstractCoordinator.java:255)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$2.onSuccess(AbstractCoordinator.java:250)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:459)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:445)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:702)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:681)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:266)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:975)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:316)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.RecoveryInProgressException): Failed to close file ******/log. Lease recovery is in progress. Try again later.
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:3113)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2905)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:3189)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:3153)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:612)
        at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.append(AuthorizationProviderProxyClientProtocol.java:125)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:414)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080)
    
        at org.apache.hadoop.ipc.Client.call(Client.java:1468)
        at org.apache.hadoop.ipc.Client.call(Client.java:1399)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
        at com.sun.proxy.$Proxy50.append(Unknown Source)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:313)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
        at com.sun.proxy.$Proxy51.append(Unknown Source)
        at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1767)
        at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1803)
        at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1796)
        at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:323)
        at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:319)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:319)
        at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1173)
        at io.confluent.connect.hdfs.wal.WALFile$Writer.<init>(WALFile.java:221)
        at io.confluent.connect.hdfs.wal.WALFile.createWriter(WALFile.java:67)
        at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:73)
        ... 45 more
  • 相关阅读:
    java:UDP广播发送与接收数据报实现
    使用Python自由切分pdf文件提取任意页面
    python实现 -- 盆友圈九宫格
    三次登陆
    django 连接 已有数据库 导出 models表
    默认列表转字典
    代码中的去掉注释
    python 列表套字典 根据相同的key分组
    python 列表套列表去重,列表套字典去重
    Tengine更新安装
  • 原文地址:https://www.cnblogs.com/felixzh/p/7429521.html
Copyright © 2011-2022 走看看