zoukankan      html  css  js  c++  java
  • kafka-connect-hdfs连接hadoop hdfs时候,竟然是单点的,太可怕了。。。果断改成HA


    2017-08-16 11:57:28,237 WARN [org.apache.hadoop.hdfs.LeaseRenewer][458] - <Failed to renew lease for [DFSClient_NONMAPREDUCE_-1756242047_26] for 30 seconds. Will retry shortly ...> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category WRITE is not supported in state standby. Visit https://s.apache.org/sbnn-error at org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:88) at org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1826) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1404) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renewLease(FSNamesystem.java:4968) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.renewLease(NameNodeRpcServer.java:875) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.renewLease(AuthorizationProviderProxyClientProtocol.java:357) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.renewLease(ClientNamenodeProtocolServerSideTranslatorPB.java:633) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080) at org.apache.hadoop.ipc.Client.call(Client.java:1468) at org.apache.hadoop.ipc.Client.call(Client.java:1399) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) at com.sun.proxy.$Proxy50.renewLease(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.renewLease(ClientNamenodeProtocolTranslatorPB.java:571) at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy51.renewLease(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.renewLease(DFSClient.java:879) at org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:417) at org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:442) at org.apache.hadoop.hdfs.LeaseRenewer.access$700(LeaseRenewer.java:71) at org.apache.hadoop.hdfs.LeaseRenewer$1.run(LeaseRenewer.java:298) at java.lang.Thread.run(Thread.java:745)
    提示信息中的网址说的很清楚https://s.apache.org/sbnn-error
    3.17. What does the message "Operation category READ/WRITE is not supported in state standby" mean?
    
    In an HA-enabled cluster, DFS clients cannot know in advance which namenode is active at a given time. So when a client contacts a namenode and it happens to be the standby, the READ or WRITE operation will be refused and this message is logged. The client will then automatically contact the other namenode and try the operation again. As long as there is one active and one standby namenode in the cluster, this message can be safely ignored.
    
    If an application is configured to contact only one namenode always, this message indicates that the application is failing to perform any read/write operation. In such situations, the application would need to be modified to use the HA configuration for the cluster. The jira HDFS-3447 deals with lowering the severity of this message (and similar ones) to DEBUG so as to reduce noise in the logs, but is unresolved as of July 2015.

    kafka-connect-hdfs中操作hdfs的HdfsStorage.class中需要做修改

    /**
     * Copyright 2015 Confluent Inc.
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     **/
    
    package io.confluent.connect.hdfs.storage;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.PathFilter;
    import org.apache.kafka.common.TopicPartition;
    
    import java.io.IOException;
    import java.net.URI;
    
    import io.confluent.connect.hdfs.wal.FSWAL;
    import io.confluent.connect.hdfs.wal.WAL;
    
    public class HdfsStorage implements Storage {
    
      private final FileSystem fs;
      private final Configuration conf;
      private final String url;
    
      public HdfsStorage(Configuration conf,  String url) throws IOException {
        //fs = FileSystem.newInstance(URI.create(url), conf);原来的
        fs = FileSystem.newInstance(conf);修改后的
        this.conf = conf;
        this.url = url;
      }
    
      @Override
      public FileStatus[] listStatus(String path, PathFilter filter) throws IOException {
        return fs.listStatus(new Path(path), filter);
      }
    
      @Override
      public FileStatus[] listStatus(String path) throws IOException {
        return fs.listStatus(new Path(path));
      }
    
      @Override
      public void append(String filename, Object object) throws IOException {
    
      }
    
      @Override
      public boolean mkdirs(String filename) throws IOException {
        return fs.mkdirs(new Path(filename));
      }
    
      @Override
      public boolean exists(String filename) throws IOException {
        return fs.exists(new Path(filename));
      }
    
      @Override
      public void commit(String tempFile, String committedFile) throws IOException {
        renameFile(tempFile, committedFile);
      }
    
    
      @Override
      public void delete(String filename) throws IOException {
        fs.delete(new Path(filename), true);
      }
    
      @Override
      public void close() throws IOException {
        if (fs != null) {
          fs.close();
        }
      }
    
      @Override
      public WAL wal(String topicsDir, TopicPartition topicPart) {
        return new FSWAL(topicsDir, topicPart, this);
      }
    
      @Override
      public Configuration conf() {
        return conf;
      }
    
      @Override
      public String url() {
        return url;
      }
    
      private void renameFile(String sourcePath, String targetPath) throws IOException {
        if (sourcePath.equals(targetPath)) {
          return;
        }
        final Path srcPath = new Path(sourcePath);
        final Path dstPath = new Path(targetPath);
        if (fs.exists(srcPath)) {
          fs.rename(srcPath, dstPath);
        }
      }
    }

    当然 url的相应配置得改成hdfs://nameservice/*,因为要HA 啊。不能按照原来的要求了,原来的要求如下:

    // HDFS Group
      public static final String HDFS_URL_CONFIG = "hdfs.url";
      private static final String HDFS_URL_DOC =
          "The HDFS connection URL. This configuration has the format of hdfs:://hostname:port and "
          + "specifies the HDFS to export data to.";
      private static final String HDFS_URL_DISPLAY = "HDFS URL";

    虽然实例化storage时候不用url了,往hive load还是要的。

        url = connectorConfig.getString(HdfsSinkConnectorConfig.HDFS_URL_CONFIG);
          topicsDir = connectorConfig.getString(HdfsSinkConnectorConfig.TOPICS_DIR_CONFIG);
          String logsDir = connectorConfig.getString(HdfsSinkConnectorConfig.LOGS_DIR_CONFIG);
    
          @SuppressWarnings("unchecked")
          Class<? extends Storage> storageClass = (Class<? extends Storage>) Class
                  .forName(connectorConfig.getString(HdfsSinkConnectorConfig.STORAGE_CLASS_CONFIG));
          storage = StorageFactory.createStorage(storageClass, conf, url);

     kafka-connect-hdfs连接hadoop hdfs时候,竟然是单点的,太可怕了。。。果断改成HA

    
    
  • 相关阅读:
    定义一个Dog类,它和静态数据成员Dogs记录Dog的个体数目。静态成员函数GetDogs用来存取Dogs。设计并测试这个类--简单
    互联网无插件直播流媒体服务器方案EasyNVR下载新的软件执行程序,出现“invalid license”字样是什么意思?
    视频流媒体服务器RTSP拉流、RTMP推流方案EasyNVR如何实现视频转推其他直播间?
    视频流媒体服务器RTSP拉流、RTMP推流流媒体服务器授权方案之加密机运行后无法授权问题解决
    RTSP安防网络摄像头/海康大华硬盘录像机网页无插件直播之EasyNVR流媒体服务器系列产品直播延时问题解析
    海康大华网络摄像头RTSP_Onvif网页无插件直播流媒体服务器EasyNVR录像版设定录像文件存储位置的方法解析
    同一路摄像头视频流接入RTSP_Onvif网页无插件直播流媒体服务器EasyNVR与其他平台播放视频有差异的原因分析
    RTSP_Onvif安防摄像头直播流媒体服务器EasyNVR产品调用接口出现"Unauthorized"问题的解决方法
    安防摄像头RTSP/Onvif协议网页无插件直播视频流媒体服务器EasyNVR录像回看质量的影响因素有哪些?
    海康、大华等网络摄像头RTSP_Onvif网页无插件直播流媒体服务器EasyNVR在内网环境下,设备不在线问题处理
  • 原文地址:https://www.cnblogs.com/felixzh/p/7429500.html
Copyright © 2011-2022 走看看