zoukankan      html  css  js  c++  java
  • Hadoop源码分析7: IPC流程(1) 主要类

    1.服务器端主要类

    public abstractclass Server

    {

      public static final ByteBuffer HEADER =ByteBuffer.wrap("hrpc".getBytes());

      public static final byte CURRENT_VERSION =4;

      private static finalThreadLocalServer SERVER= new ThreadLocalServer();  

      private static finalThreadLocalCall CurCall = newThreadLocalCall();

      private String bindAddress;   

      private int port;           

      private int handlerCount;     

      private int readThreads;     

      private Class? extendsWritable paramClass;    

      private int maxIdleTime;   

      private int thresholdIdleConnections;     

      private Configuration conf;

      private int maxQueueSize;

      private final int maxRespSize;

      private int socketSendBufferSize;

      volatile private boolean running =true;

      privateBlockingQueueCall callQueue;

      privateListConnection connectionList = Collections.synchronizedList(newLinkedListConnection());

      private Listener listener= null;

      private Responder responder= null;

      private int numConnections = 0;

      private Handler[]handlers = null;

     

      //内部类Server.Call,包装请求参数

      private static class Call {

         private int id;                          // theclient's call id    

         private Writable param;                   // the parameter passed   

         private Connection connection;   

         private ByteBuffer response;     

      }   

     

    //内部类Server.Listener ,线程  

      private class Listener extendsThread {

         privateServerSocketChannel acceptChannel =null; //the accept channel

         privateSelector selector = null; //theselector that we use for the server

         private Reader[]readers = null;

         private int currentReader =0;

         private InetSocketAddressaddress; //the address we bind at

         private Random rand = newRandom();

         private longlastCleanupRunTime = 0; 

         private ExecutorServicereadPool;    

     

      //内部类Server.Listener.Reader 线程    

        privateclass Reader implementsRunnable {

           privatevolatile boolean adding = false;

           privateSelector readSelector =null;

       }  

     

       //内部类Server.Responder  , 线程  

        privateclass Responder extendsThread {

          privateSelector writeSelector;

          private intpending;     

       } 

     

    //内部类Server.Connection,而Client.Connection是线程 

        publicclass Connection {

          privateboolean rpcHeaderRead = false; // if initial rpc header isread

          privateboolean headerRead = false;  

          privateSocketChannel channel;

          privateByteBuffer data;

          privateByteBuffer dataLengthBuffer;

          privateLinkedListCall responseQueue;

          privatevolatile int rpcCount = 0; // number of outstanding rpcs

          privatelong lastContact;

          private intdataLength;

          privateSocket socket;

          privateString hostAddress;

          private intremotePort;

          privateInetAddress addr;

         ConnectionHeader header = new ConnectionHeader();

         Class<?> protocol;

          privateAuthMethod authMethod; 

       }  

     

      //内部类Server.Handler,线程 

      private class Handler extendsThread {

     }

     

    }

     

    2.客户端主要类

    public classClient {

      privateHashtable<ConnectionId, Connectionconnections = new Hashtable<ConnectionId,Connection>(); 

       privateClass? extendsWritable valueClass;  

       private intcounter;                        // counter for call ids

       privateAtomicBoolean running = new AtomicBoolean(true); // if clientruns

       finalprivate Configuration conf;

       privateSocketFactory socketFactory;          // how tocreate sockets

     

       private intrefCount = 1;


      //内部类Client.Call

       private class Call {

         int id;                                 // callid

         Writable param;                          //parameter

         Writable value;                          // value,null if error

         IOException error;                       // exception, null ifvalue

         boolean done; 

      }

       //内部类Client.Connection ,线程 ,而Server.Connection不是线程 

       private class Connection extendsThread {

            privateInetSocketAddress server;           // server ip:port

            privateString serverPrincipal;  // server's krb5principal name

            privateConnectionHeader header;            // connection header

            privatefinal ConnectionId remoteId;             // connection id

            privateAuthMethod authMethod; // authentication method

            private Socket socket = null;              // connected socket

            privateDataInputStream in;

            privateDataOutputStream out;

            private intrpcTimeout;

            private intmaxIdleTime;  

            private intmaxRetries; //the max. no. of retries for socket connections

            privateboolean tcpNoDelay; // if T then disable Nagle's Algorithm

            private intpingInterval; / 

            privateHashtable<Integer, Callcalls= new Hashtable<Integer, Call>();

            privateAtomicLong lastActivity = new AtomicLong(); 

            privateAtomicBoolean shouldCloseConnection = new AtomicBoolean();  

            private IOException closeException; // closereason  


           //内部类Client.Connection.PingInputStream

            private class PingInputStream extendsFilterInputStream {

          }


       }

       //内部类Client.ParallelCall 

       privateclass ParallelCall extends Call {

          privateParallelResults results;

          private intindex;

       }   

       //内部类Client.ParallelResults 

        private static classParallelResults {

           privateWritable[] values;

           privateint size;

           privateint count;

         //

        }

       

      //内部类Client.ConnectionId 

       static class ConnectionId {

          InetSocketAddress address;

          UserGroupInformationticket;

          Class<?>protocol;

          privatestatic final int PRIME = 16777619;

          privateint rpcTimeout;

          privateString serverPrincipal;

          privateint maxIdleTime;  

          privateint maxRetries; //the max. no. of retries for socketconnections

          privateboolean tcpNoDelay; // if T then disable Nagle's Algorithm

          privateint pingInterval; // how often sends ping to the server inmsecs

        }

        

    }

    3.RPC主要类

    public class RPC {

      private static ClientCache CLIENTS=newClientCache();


      //内部类RPC.ClientCache 

      static privateclass ClientCache {

         private MapSocketFactory, Client clients=  new HashMapSocketFactory, Client();

      }


     //内部类RPC.Invocation ,只是一个包装请求参数的普通类,不执行动态代理方法

       private static class Invocationimplements Writable, Configurable {

          privateString methodName;

          privateClass[] parameterClasses;

          privateObject[] parameters;

     

          privateConfiguration conf;

      }


      //内部类RPC.Invoker ,执行动态代理方法

       private static class Invoker implementsInvocationHandler {

           privateClient.ConnectionId remoteId;

           privateClient client;

           privateboolean isClosed = false;

       }

        //内部类RPC.VersionMismatch 

        public static classVersionMismatch extends IOException {

           privateString interfaceName;

           privatelong clientVersion;

           privatelong serverVersion;

       }  


       //内部类RPC.Server ,添加了两个成员  instance,verbose

      public static class Server extendsorg.apache.hadoop.ipc.Server {

            private Object instance;

            private boolean verbose;

       }



    }

    4.其他类

     

    //IPC所有类都要实现的接口

    publicinterface VersionedProtocol {

      publiclong getProtocolVersion(String protocol,  longclientVersion) throws IOException;

    }

     

     

    //连接头信息,包括protocoluserGroupInformation   authMethod三个成员变量

    class ConnectionHeader implementsWritable  { 

       private String protocol;

       private UserGroupInformation ugi =null;

       private AuthMethod authMethod; ......

     

    //访问状况,包括SUCCESSERRORFATAL

    enum Status{

      SUCCESS(0),

      ERROR(1),

      FATAL(-1);.......  

    }

     

    //包装IO异常

    publicclass RemoteException extendsIOException {

    }

     

  • 相关阅读:
    #JavaScript 闭包问题详解 #打倒心魔
    Typora + cnblog 图片自动上传 (超详细哦)
    #FUNCTION#CALL对象中的函数内作用域问题.md
    #windows #Github #HOST
    #######对象迭代器######
    #为什么不建议使用for...in 去遍历数组
    #前后端附件传输,去重的一种方式#解决方案
    #页面滚动刷新的实现原理 #下拉刷新#上拉刷新#drag to fresh
    自己动手实现一个阻塞队列
    APC注入
  • 原文地址:https://www.cnblogs.com/leeeee/p/7276533.html
Copyright © 2011-2022 走看看