zoukankan      html  css  js  c++  java
  • [hadoop源码阅读][6]org.apache.hadoop.ipcprotocol和心跳分析

    1.      protocolrpc中的作用

    通过对org.apache.hadoop.ipc包分析中,Hadoop实现了基于IPC模型的RPC机制,可以不需要像Java中实现的RMI机制一样,在RPC调用的C/S两端分别创建StubSkeleton,而是通过一组协议来进行RPC调用就可以实现通信。这主要是由于Hadoop所采用的序列化机制简化了RPC调用的复杂性。Hadoop定义了自己的通信协议,这些协议都是建立在TCP/IP协议之上的,规范了通信两端的约定。

    Hadoop集群中,不同进程之间通信需要使用合适的协议才能够进行交互。在协议接口中约定了通信双方的特定行为,那么,在实现这些通信协议的实现类中,就能看到指定进程是如何实现协议接口中约定的行为的。

     

    2.      hadoop实现了那些protocol

    所有要使用RPC服务的类都要实现该接口VersionedProtocol,我们可以来看一下有哪些接口继承了该接口。

    clip_image0023

    VersionedProtocol协议是Hadoop的最顶层协议接口的抽象;

    1HDFS相关

    • ClientDatanodeProtocolclientdatanode交互的接口,操作不多,只有一个block恢复的方法。那么,其它数据请求的方法呢?clientdatanode主要交互是通过流式的socket实现,源码在DataXceiver,在这里先不说了;
    • ClientProtocolclientNamenode交互的接口,所有控制流的请求均在这里,如:创建文件、删除文件等;
    • DatanodeProtocolDatanodeNamenode交互的接口,如心跳、blockreport等;
    • NamenodeProtocolSecondaryNodeNamenode交互的接口。

    2Mapreduce相关

    • InterDatanodeProtocolDatanode内部交互的接口,用来更新block的元数据;
    • InnerTrackerProtocolTaskTrackerJobTracker交互的接口,功能与DatanodeProtocol相似;
    • JobSubmissionProtocolJobClientJobTracker交互的接口,用来提交Job、获得Job等与Job相关的操作;
    • TaskUmbilicalProtocolTask中子进程与母进程交互的接口,子进程即mapreduce等操作,母进程即TaskTracker,该接口可以回报子进程的运行状态(词汇扫盲: umbilical 脐带的, 关系亲密的) 。

    3)其它

    • AdminOperationProtocol:不用用户操作的接口,提供一些管理操作,如刷新JobTrackernode列表;
    • RefreshAuthorizationPolicyProtocolRefreshUserMappingsProtocol:暂不明白。

    3.      心跳机制分析

    hadoop的集群是基于master/slave模式,namenodejobtracker属于master,而datanode/tasktracker属于slavesmaster只有一个,而slaves有多个。 namenodedatanode之间的通信,jobtrackertasktracker直接的通信,都是通过“心跳”完成的。

    心跳的机制大概是这样的:

    1) master启动的时候,会开一个ipc server在那里。

    2) slave启动时,会连接master,并每隔3秒钟主动向master发送一个“心跳”,将自己的状态信息告诉master,然后master也是通过这个心跳的返回值,向slave节点传达指令。

    3.1   namenodedatanode的心跳

    datanode.java代码里面offerService方法中,每隔3秒向namenode发送心跳的代码

    public void offerService() throws Exception { while (shouldRun) { try { long startTime = now(); // 如果到了3秒钟,就向namenode发心跳 if (startTime - lastHeartbeat > heartBeatInterval) { // // All heartbeat messages include following info: // -- Datanode name // -- data transfer port // -- Total capacity // -- Bytes remaining // lastHeartbeat = startTime; DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration, data.getCapacity(), data.getDfsUsed(), data.getRemaining(), xmitsInProgress.get(), getXceiverCount() ); // 注意上面这行代码,“发送心跳”竟然就是调用namenode的一个方法?? // 处理对心跳的返回值(namenode传给datanode的指令) if (!processCommand(cmds)) continue; } } ... } // while (shouldRun) } // offerService

    现在的问题是datanode怎么获得namenode对象的?继续往下看

    public class DataNode extends Configured implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable { ... public DatanodeProtocol namenode = null;//DatanodeProtocol ... void startDataNode(Configuration conf, AbstractList<File> dataDirs, SecureResources resources) throws IOException { this.namenode = (DatanodeProtocol) RPC.waitForProxy(DatanodeProtocol.class, DatanodeProtocol.versionID, nameNodeAddr, conf);//DatanodeProtocol } } public class NameNode implements ClientProtocol, DatanodeProtocol, NamenodeProtocol, FSConstants, RefreshAuthorizationPolicyProtocol, RefreshUserMappingsProtocol //NameNode实现了DatanodeProtocol接口,DatanodeProtocol接口定义了namenode和datanode之间通信的方法。 public interface DatanodeProtocol extends VersionedProtocol { ... public DatanodeRegistration register(DatanodeRegistration registration) throws IOException; /** * sendHeartbeat() tells the NameNode that the DataNode is still alive and well. Includes some status info, too. * It also gives the NameNode a chance to return an array of "DatanodeCommand" objects. * A DatanodeCommand tells the DataNode to invalidate local block(s), or to copy them to other DataNodes, etc. */ public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration, long capacity, long dfsUsed, long remaining, int xmitsInProgress, int xceiverCount) throws IOException; public DatanodeCommand blockReport(DatanodeRegistration registration, long[] blocks) throws IOException; ... }
    • 1) 对namenode的赋值,并不是真正的new了一个实现了DatanodeProtocol接口的对象,而是获得了一个动态代理!!
    • 2) 上面这段代码中,protocol的类型是DatanodeProtocol.class
    • 3) 对namenode的所有调用,都被委托(delegate)给了Invoker

    剩下的就是和之前分析的client端和server端的流程一样了,总结一下流程就是:

    datanode向namenode发送heartbeat过程是这样的:

        a) 在datanode初始化获得namenode的proxy
        b) 在datanode上,调用namenode proxy的heartbeat方法:
            namenode.sendHeartbeat(dnRegistration,
                                                           data.getCapacity(),
                                                           data.getDfsUsed(),
                                                           data.getRemaining(),
                                                           xmitsInProgress.get(),
                                                           getXceiverCount());
        c) 在datanode上的namenode动态代理类将这个调用包装成(或者叫“序列化成”)一个Invocation对象,并调用client.call方法
        d) client call方法将Invocation转化为Call对象
        e) client 将call发送到真正的namenode服务器
        f) namenode接收后,转化成namenode端的Call,并process后,通过Responder发回来!
        g) datanode接收结果,并将结果转化为DatanodeCommand[]

    3.2 jobtracker和tasktracker的心跳

    org.apache.hadoop.mapred.TaskTracker

    //代码一: State offerService() throws Exception { long lastHeartbeat = System.currentTimeMillis(); while (running && !shuttingDown) { // 发送心跳,调用代码二 HeartbeatResponse heartbeatResponse = transmitHeartBeat(now); return State.NORMAL; } } // 代码二: HeartbeatResponse transmitHeartBeat(long now) throws IOException { HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted, justInited, askForNewTask, heartbeatResponseId); return heartbeatResponse; }

    剩下的基本上和上面流程差不多....

    4.      用户自定义的protocol使用hadoop rpc

    MyProtocol.java

    import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.VersionedProtocol; public interface MyProtocol extends VersionedProtocol { public Text println(Text t); }

    MyServer.java

    import java.io.IOException; import java.net.UnknownHostException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server; public class MyServer implements MyProtocol { private Server server; public MyServer() { try { server = RPC.getServer(this, "localhost", 8888, new Configuration()); server.start(); server.join(); } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public Text println(Text t) { System.out.println(t); return new Text("finish"); } @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { return 1; } public static void main(String[] args) { new MyServer(); } }

    MyClient.java

    import java.io.IOException; import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; public class MyClient { private MyProtocol proxy; public MyClient() { InetSocketAddress addr = new InetSocketAddress("localhost", 8888); try { proxy = (MyProtocol) RPC.waitForProxy(MyProtocol.class, 1, addr , new Configuration()); } catch (IOException e) { e.printStackTrace(); } } public void println(String s) { System.out.println(proxy.println(new Text(s))); } public void close() { RPC.stopProxy(proxy); } public static void main(String[] args) { MyClient c = new MyClient(); c.println("123"); c.close(); } }

    结果输出:

    在server端输出

    12/06/23 17:11:30 INFO ipc.Server: Starting Socket Reader #1 for port 6789 12/06/23 17:11:30 INFO metrics.RpcMetrics: Initializing RPC Metrics with hostName=MyServer, port=6789 12/06/23 17:11:30 INFO metrics.RpcDetailedMetrics: Initializing RPC Metrics with hostName=MyServer, port=6789 12/06/23 17:11:30 INFO ipc.Server: IPC Server Responder: starting 12/06/23 17:11:30 INFO ipc.Server: IPC Server listener on 6789: starting 12/06/23 17:11:30 INFO ipc.Server: IPC Server handler 0 on 6789: starting 123

    在client端输出

    finish

    5.      参考url

    http://www.wikieno.com/2012/02/hadoop-ipc-rpc/

    http://blog.csdn.net/shirdrn/article/details/4604229

    http://blog.csdn.net/shirdrn/article/details/4608377

    http://www.cnblogs.com/hiddenfox/archive/2011/12/30/2305786.html

  • 相关阅读:
    0101-ioc
    通过Android studio手动触发Android 上层GC(垃圾回收)的方法
    【Win10】BeyondCompare时提示"许可证密钥已被撤销"的解决办法
    Android Historian安装使用
    初探OpenCL之Mac OS上的hello world示例
    python画高斯分布图形
    深度学习优化算法总结
    《用Python玩转数据》项目—线性回归分析入门之波士顿房价预测(二)
    《用Python玩转数据》项目—线性回归分析入门之波士顿房价预测(一)
    (转)导数、偏导数、方向导数、梯度、梯度下降
  • 原文地址:https://www.cnblogs.com/xuxm2007/p/2559323.html
Copyright © 2011-2022 走看看