zoukankan      html  css  js  c++  java
  • hadoop的心跳回忆

    hadoop的集群是基于master/slave模式,namenode和jobtracker属于master,而datanode/tasktracker属于slaves。master只有一个,而slaves有多个。 

    namenode与datanode之间的通信,jobtracker与tasktracker直接的通信,都是通过“心跳”完成的。 

    以前看过hadoop心跳原理的源代码,今天再回忆一下,呵呵,所以叫“心跳回忆”。 


    1、心跳机制 

    心跳的机制大概是这样的: 
    1) master启动的时候,会开一个ipc server在那里。 
    2) slave启动时,会连接master,并每隔3秒钟主动向master发送一个“心跳”,将自己的状态信息告诉master,然后master也是通过这个心跳的返回值,向slave节点传达指令。 


    2、找到心跳的代码 

    拿namenode和datanode来说,在datanode的offerService方法中,每隔3秒向namenode发送心跳的代码: 

    Java代码  
    /** 
      * Main loop for the DataNode.  Runs until shutdown, 
      * forever calling remote NameNode functions. 
      */  
     public void offerService() throws Exception {  
          
       ...  
      
       //  
       // Now loop for a long time....  
       //  
      
       while (shouldRun) {  
         try {  
           long startTime = now();  
      
           //  
           // Every so often, send heartbeat or block-report  
           //  
             
    // 如果到了3秒钟,就向namenode发心跳  
           if (startTime - lastHeartbeat > heartBeatInterval) {  
             //  
             // All heartbeat messages include following info:  
             // -- Datanode name  
             // -- data transfer port  
             // -- Total capacity  
             // -- Bytes remaining  
             //  
             lastHeartbeat = startTime;  
             DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,  
                                                          data.getCapacity(),  
                                                          data.getDfsUsed(),  
                                                          data.getRemaining(),  
                                                          xmitsInProgress.get(),  
                                                          getXceiverCount());  
      
      // 注意上面这行代码,“发送心跳”竟然就是调用namenode的一个方法??  
      
             myMetrics.heartbeats.inc(now() - startTime);  
             //LOG.info("Just sent heartbeat, with name " + localName);  
      
      // 处理对心跳的返回值(namenode传给datanode的指令)  
             if (!processCommand(cmds))  
               continue;  
           }  
      
        // 这里省略很多代码  
    ...  
       } // while (shouldRun)  
     } // offerService  


    上面这段代码,如果是单机的程序,没什么值得奇怪的。但是,这是hadoop集群!datanode和namenode在2台不同的机器(或2个JVM)上运行!datanode机器竟然直接调用namenode的方法!这是怎么实现的?难道是传说中的RMI吗?? 

    下面我们主要就来分析这个方法调用的细节。 


    3、心跳的底层细节一:datanode怎么获得namenode对象的? 

    首先,DataNode类中,有一个namenode的成员变量: 

    Java代码  收藏代码
    1. public class DataNode extends Configured   
          implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {  
        ...  
        public DatanodeProtocol namenode = null;  
        ...   
      }  


    下面是NameNode类的定义: 

    Java代码  收藏代码
    1. public class NameNode implements ClientProtocol, DatanodeProtocol,  
                                       NamenodeProtocol, FSConstants,  
                                       RefreshAuthorizationPolicyProtocol {  
        ...   
      }  


    注意:NameNode实现了DatanodeProtocol接口,DatanodeProtocol接口定义了namenode和datanode之间通信的方法。 

    那么,DataNode类是怎么获取到NameNode类的引用呢? 

    在Datanode端,为namenode变量赋值的代码: 

    Java代码  
    // connect to name node  
    this.namenode = (DatanodeProtocol)   
      RPC.waitForProxy(DatanodeProtocol.class,  
                       DatanodeProtocol.versionID,  
                       nameNodeAddr,   
                       conf);  



    在继续去RPC类中追踪: 

    Java代码  
    VersionedProtocol proxy =  
            (VersionedProtocol) Proxy.newProxyInstance(  
                protocol.getClassLoader(), new Class[] { protocol },  
                new Invoker(addr, ticket, conf, factory));  


    现在,明白了! 
    1) 对namenode的赋值,并不是真正的new了一个实现了DatanodeProtocol接口的对象,而是获得了一个动态代理!! 
    2) 上面这段代码中,protocol的类型是DatanodeProtocol.class 
    3) 对namenode的所有调用,都被委托(delegate)给了Invoker 


    4、心跳的底层细节二:看看Invoker类 

    Invoker类是org.apache.hadoop.ipc.RPC类的一个静态内部类: 

    Java代码  
    private static class Invoker implements InvocationHandler {  

    在这个类中,看invoke方法: 
      

    Java代码  
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {  
                ...  
      
                  ObjectWritable value = (ObjectWritable)  
                    client.call(new Invocation(method, args), address,   
                                method.getDeclaringClass(), ticket);  
                    ...  
                  return value.get();  
               }  


    所有的方法调用又被delegate给client的call方法了! 

    client是Invoker中的成员变量: 
      

    Java代码  
    private Client client;  


    所以可以看出:DatanodeProtocol中的每个方法调用,都被包装成一个Invocation对象,再由client.call()调用 


    5、心跳的底层细节三:Invocation类 

    Invocation类是org.apache.hadoop.ipc.RPC类的一个静态内部类 

    没有什么业务逻辑方法,主要作用就是一个VO 


    6、心跳的底层细节四:client类的call方法 

    接下来重点看client类的call方法: 

    Java代码  
    public Writable call(Writable param, InetSocketAddress addr,   
                         Class<?> protocol, UserGroupInformation ticket)    
                         throws InterruptedException, IOException {  
      
      Call call = new Call(param);     
    // 将Invocation转化为Call  
      Connection connection = getConnection(addr, protocol, ticket, call);  
    // 连接远程服务器  
      connection.sendParam(call);                 // send the parameter  
    // 将“序列化”后的call发给过去  
      boolean interrupted = false;  
      synchronized (call) {  
        while (!call.done) {  
          try {  
            call.wait();                           // wait for the result  
    // 等待调用结果  
          } catch (InterruptedException ie) {  
            // save the fact that we were interrupted  
            interrupted = true;  
          }  
        }  
      
        if (interrupted) {  
          // set the interrupt flag now that we are done waiting  
          Thread.currentThread().interrupt();  
        }  
      
        if (call.error != null) {  
          if (call.error instanceof RemoteException) {  
            call.error.fillInStackTrace();  
            throw call.error;  
          } else { // local exception  
            throw wrapException(addr, call.error);  
          }  
        } else {  
          return call.value;  
    // 返回  
        }  
      }  
    }  



    7、现在,一目了然了 

    Java代码  
    datanode向namenode发送heartbeat过程是这样的:  
      
        a) 在datanode初始化获得namenode的proxy  
        b) 在datanode上,调用namenode proxy的heartbeat方法:  
            namenode.sendHeartbeat(dnRegistration,  
                                                           data.getCapacity(),  
                                                           data.getDfsUsed(),  
                                                           data.getRemaining(),  
                                                           xmitsInProgress.get(),  
                                                           getXceiverCount());  
        c) 在datanode上的namenode动态代理类将这个调用包装成(或者叫“序列化成”)一个Invocation对象,并调用client.call方法  
        d) client call方法将Invocation转化为Call对象  
        e) client 将call发送到真正的namenode服务器  
        f) namenode接收后,转化成namenode端的Call,并process后,通过Responder发回来!  
        g) datanode接收结果,并将结果转化为DatanodeCommand[]  
              


    8、再看动态代理 

    动态代理:让“只有接口,没事对应的实现类”成为可能,因为具体方法的实现可以委托给另一个类!! 

    在这个例子中,就datanode而言,DatanodeProtocol接口是没有实现类的! 


    *** THE END ***

    原文地址:http://thinkinginhadoop.iteye.com/blog/709993

  • 相关阅读:
    LOJ2323. 「清华集训 2017」小 Y 和地铁 【搜索】【思维】【好】
    BZOJ2687 交与并/BZOJ2369 区间【决策单调性优化DP】【分治】
    BZOJ1563 NOI2009 诗人小G【决策单调性优化DP】
    LOJ6039. 「雅礼集训 2017 Day5」珠宝【决策单调性优化DP】【分治】【思维好题】
    BZOJ4709 Jsoi2011 柠檬【决策单调性+单调栈】
    BZOJ2216 Poi2011 Lightning Conductor 【决策单调性优化DP】
    BZOJ3675 Apio2014 序列分割 【斜率优化】
    BZOJ4566 Haoi2016 找相同字符【广义后缀自动机】
    51nod 1600 Simple KMP【后缀自动机+LCT】【思维好题】*
    linux usermod
  • 原文地址:https://www.cnblogs.com/aaronwxb/p/2687614.html
Copyright © 2011-2022 走看看