zoukankan      html  css  js  c++  java
  • Hadoop中客户端和服务器端的方法调用过程

    1、Java动态代理实例

    Java 动态代理一个简单的demo:(用以对比Hadoop中的动态代理)

    Hello接口:

    public interface Hello {  
          void sayHello(String to);  
          void print(String p);   
    }

    Hello接口的实现类:

    public class HelloImpl implements Hello { 
         
      
    public void sayHello(String to) { 
            System.out.println(
    "Say hello to " + to); 
        } 
         
      
    public void print(String s) { 
            System.out.println(
    "print : " + s); 
        } 
         
    }

    与代理类(HelloImpl类)相关联的InvocationHandler对象

    public class LogHandler implements InvocationHandler { 
         
      
    private Object dele; 
         
      
    public LogHandler(Object obj) { 
          
    this.dele = obj; 
        } 
         
      
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 
            doBefore(); 
          
    //在这里完全可以把下面这句注释掉,而做一些其它的事情 
            Object result = method.invoke(dele, args); 
            after(); 
          
    return result; 
        } 
         
      
    private void doBefore() { 
            System.out.println(
    "before...."); 
        } 
         
      
    private void after() { 
            System.out.println(
    "after...."); 
        } 
    }

    最后测试代码如下:

    public class ProxyTest { 
     
      
    public static void main(String[] args) { 
            HelloImpl impl
    = new HelloImpl(); 
            LogHandler handler
    = new LogHandler(impl); 
          
    //这里把handler与impl新生成的代理类相关联 
            Hello hello = (Hello) Proxy.newProxyInstance(impl.getClass().getClassLoader(), impl.getClass().getInterfaces(), handler); 
             
          
    //这里无论访问哪个方法,都是会把请求转发到handler.invoke
            hello.print("All the test"); 
            hello.sayHello(
    "Denny"); 
        } 
     
    }
     

    2、Hadoop中的动态代理

    2.1、客户端方法调用过程

    IPC客户端的处理比动态代理实例稍微复杂:代理对象上的调用被InvocationHandler捕获后,请求被打包并通过IPC连接发送到服务器上,客户端等待并在服务器的处理应答到达后,生成并返回调用结果。IPC上的调用是个同步操作,即,线程会一直等待调用结束,才会开始后续处理;而网络的处理时异步的,请求发送后,不需要等待应答。客户端通过java的wait()/notify()机制简单地解决了异步网络处理和同步IPC调用的差异。

    Hadoop对外提供查询文件状态的接口,如下:

    public interface IPCQueryStatus extends VersionedProtocol {
        IPCFileStatus getFileStatus(String filename);
    }

    客户端通过如下代码调用:

    IPCQueryStatus query = (IPCQueryStatus) RPC.getProxy(IPCQueryStatus.class, IPCQueryServer.IPC_VER, addr, new Configuration());
    IPCFileStatus status = query.getFileStatus("	mp	estIPC");

    2.1.1、Client端动态代理实现

    在RPC的getProxy代码如下:

    public static VersionedProtocol getProxy(
          Class<? extends VersionedProtocol> protocol,
          long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
          Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException {
    
        ......
        VersionedProtocol proxy =
            (VersionedProtocol) Proxy.newProxyInstance(
                protocol.getClassLoader(), new Class[] { protocol },
                new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
        ......
        return proxy;
        ......
      }

    需要制定一个InvocationHandler,对于所有的调用请求,这个InvocationHandler都是Invoke,如下:

    private static class Invoker implements InvocationHandler {
        private Client.ConnectionId remoteId;// 用来标示一个connection,用以复用
        private Client client;//最重要的成员变量,RPC客户端
        private boolean isClosed = false;
    
        public Invoker(Class<? extends VersionedProtocol> protocol,
            InetSocketAddress address, UserGroupInformation ticket,
            Configuration conf, SocketFactory factory,
            int rpcTimeout) throws IOException {
          this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
              ticket, rpcTimeout, conf);
          this.client = CLIENTS.getClient(conf, factory);//
        }
        ......
        
        public Object invoke(Object proxy, Method method, Object[] args)
          ......
          
          ObjectWritable value = (ObjectWritable)
            client.call(new Invocation(method, args), remoteId);
          ......
          
          return value.get();
        }
    }

    在上面的代码中,client负责发送IPC请求,并获取结果,类似最上面demo中LogHandler中的dele。

    2.1.2、Client通过Connection发送IPC请求并获取结果

    如下为client.call方法调用Connection.sendParam发送IPC请求:

    public Writable call(Writable param, ConnectionId remoteId)  
                           throws InterruptedException, IOException {
        Call call = new Call(param);
        Connection connection = getConnection(remoteId, call);
        connection.sendParam(call);                 // send the parameter
        ...
        synchronized (call) {
          while (!call.done) {
            try {
              call.wait();                           // wait for the result
            } catch (InterruptedException ie) {
              ...
            }
          }
    
          ...
          if (call.error != null) {
            ...
            throw call.error;
            ...
          } else {
            return call.value;
          }
        }
    }

    connection.sendParam后,会再调用receiveMessage来获取返回结果。如下:

    private class Connection extends Thread {
        ......
        
        public void run() {
            ......
            while (waitForWork()) {//wait here for work - read or close connection
            receiveResponse();
            }
            ......
        }
        ......
        private void receiveResponse() {
          ......
          touch();
          
          try {
            int id = in.readInt();                    // try to read an id
            ......
            Call call = calls.get(id);
    
            int state = in.readInt();     // read call status
            if (state == Status.SUCCESS.state) {
              Writable value = ReflectionUtils.newInstance(valueClass, conf);
              value.readFields(in);                 // read value
              call.setValue(value);
              calls.remove(id);
            } else if (state == Status.ERROR.state) {
              call.setException(new RemoteException(WritableUtils.readString(in),
                                                    WritableUtils.readString(in)));
              calls.remove(id);
            } else if (state == Status.FATAL.state) {
              // Close the connection
              markClosed(new RemoteException(WritableUtils.readString(in), 
                                             WritableUtils.readString(in)));
            }
          } catch (IOException e) {
            markClosed(e);
          }
        }
    }

    connection会调用call的setValue或者setException,两个方法都会调用callComplete方法,来调用notify通知进程IPC调用已结束

    protected synchronized void callComplete() {
          this.done = true;
          notify();                                 // notify caller
        }
    
        public synchronized void setException(IOException error) {
          this.error = error;
          callComplete();
        }
        
        
        public synchronized void setValue(Writable value) {
          this.value = value;
          callComplete();
        }

    2.2、服务器端方法调用过程

    服务端由Listener接收。

    2.2.1、Listener接收IPC请求的工作过程

    Listener主要运行NIO选择器循环,并在Listener.doRead()方法中读取数据,Connection.readAndProcess()中恢复数据帧,然后调用processData().

    void Listener.doRead(SelectionKey key) throws InterruptedException {
        int count = 0;
        Connection c = (Connection)key.attachment();
        ...
        count = c.readAndProcess();
        ...
          
    }
    
    public int Connection.readAndProcess() throws IOException, InterruptedException {
        ......
        processOneRpc(data.array());
        ......
    }
    
    private void Connection.processOneRpc(byte[] buf) throws IOException,
            InterruptedException {
        if (headerRead) {
            processData(buf);
        } else {
            processHeader(buf);
            ......
        }
    }
    
    private void Connection.processData(byte[] buf) throws  IOException, InterruptedException {
        DataInputStream dis =
            new DataInputStream(new ByteArrayInputStream(buf));
        int id = dis.readInt();                    // try to read an id
    
        ......
        Writable param = ReflectionUtils.newInstance(paramClass, conf);//★??paramClass在哪儿设置的★在RPC.Server中,paramClass是Invocation,IPC调用传递的都是Invocation
        param.readFields(dis);        
    
        Call call = new Call(id, param, this);
        callQueue.put(call);              // queue the call; maybe blocked here
    }

    ProcessData反序列化调用参数,构造服务器端的Call对象。然后放入callQueue队列中。callQueue阻塞队列定义于Server类中,是Listener和Handler的边界。(生产者Listener消费者Handler)。

    2.2.2、Handler处理IPC请求的工作过程

    Handler主要工作都在run方法中完成。主循环中,每循环一次处理一个请求(通过调用Server的抽象方法call来完成)。

    public void run() {
        ......
        SERVER.set(Server.this);
        ByteArrayOutputStream buf = 
        new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
        while (running) {
    
            final Call call = callQueue.take(); // 获取一个IPC调用
            ......
            String errorClass = null;
            String error = null;
            Writable value = null;
    
            CurCall.set(call);
            ......
            value = call(call.connection.protocol, call.param, 
                       call.timestamp);//实际代码用到jaas,这里简化
            ......
    
            CurCall.set(null);
            synchronized (call.connection.responseQueue) {
            ......
            setupResponse(buf, call, 
                        (error == null) ? Status.SUCCESS : Status.ERROR, 
                        value, errorClass, error);
            ...
            responder.doRespond(call);//★?
            }
    
        }
    
    }

    Server.call调用后返回一个writable对象--value,然后通过调用setupResponse将结果序列化到call的Response成员变量中。

    private void setupResponse(ByteArrayOutputStream response, 
                                 Call call, Status status, 
                                 Writable rv, String errorClass, String error) 
      throws IOException {
        response.reset();
        DataOutputStream out = new DataOutputStream(response);
        out.writeInt(call.id);                // write call id
        out.writeInt(status.state);           // write status
    
        if (status == Status.SUCCESS) {
          rv.write(out);
        } else {
          WritableUtils.writeString(out, errorClass);
          WritableUtils.writeString(out, error);
        }
       ......
        call.setResponse(ByteBuffer.wrap(response.toByteArray()));
      }

    Server.call抽象方法的具体实现在RPC.Server中。代码如下:

    private Object instance;
    ......
    
    public Writable call(Class<?> protocol, Writable param, long receivedTime) 
        throws IOException {
        
        Invocation call = (Invocation)param;
        
        Method method =
          protocol.getMethod(call.getMethodName(),
                                   call.getParameterClasses());
        method.setAccessible(true);
    
        Object value = method.invoke(instance, call.getParameters());
        
        return new ObjectWritable(method.getReturnType(), value);      
    
    }

    Handler所在线程是共享资源,当有一个IPC请求处理完后,即调用Response的doResponse返回结果,而不亲自返回,原因有二:

    1. 对共享资源的占用时间越短越好;

    2. IPC返回受网络通信时间影响,可能会占用很长时间。

    2.2.3、Response的工作过程

    doResponse的代码很简单,将Call放入IPC连接的应答队列中,如果应答队列为1,立即调用processResponse发放向客户端发送结果,(队列为1,表明此IPC连接比较空闲,直接发送,避免从Handler线程到Response线程的切换开销)

    void doRespond(Call call) throws IOException {
          synchronized (call.connection.responseQueue) {
            call.connection.responseQueue.addLast(call);
            if (call.connection.responseQueue.size() == 1) {
              processResponse(call.connection.responseQueue, true);
            }
          }
        }

    Response有一个类似于Listener的NIO选择器,用来处理当队列不为1时的发送。只是Listener关注OP_READ和OP_ACCEPT事件,而Response关注OP_WRITE事件。代码如下:

    public void run() {
    
          while (running) {
            
              waitPending();     // 等待通道登记
              writeSelector.select(PURGE_INTERVAL); // 等待通道可写
              Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
              while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                try {
                  if (key.isValid() && key.isWritable()) {
                      doAsyncWrite(key);//输出远程IPC调用结果
                  }
                } catch (IOException e) {
                }
              }
              ......
        }
    }
    
    private void doAsyncWrite(SelectionKey key) throws IOException {
          Call call = (Call)key.attachment();
          ......
          synchronized(call.connection.responseQueue) {
            if (processResponse(call.connection.responseQueue, false)) {//调用输出
              try {
                key.interestOps(0);//processResponse返回true,表示无等待数据,清楚兴趣操作集
              } catch (CancelledKeyException e) {
               ......
              }
            }
          }
    }
    
    
    private boolean processResponse(LinkedList<Call> responseQueue,
                                        boolean inHandler) throws IOException {
          ......
        synchronized (responseQueue) {
          ......
          int numBytes = channelWrite(channel, call.response);
          
     
          done = true;               // error. no more data for this channel.
          closeConnection(call.connection);
        }
      return done;
    }

    processResponse关键点:

    1. 可被Handler调用(当应答队列为1),参数inHandler为true,也可被Response调用,参数inHandler为false,表示队列为1或更多。

    2. 返回true,表示通道上无需要发送的数据。

    2.3总结

    IPC Client端,发送Client.Call(new Invocation(method,args), remoteId)

    --封装过程:Call.Id ,  Invocation---(查看Client.Connection.sendParam)

    IPC Server端,接收Server.Call(Id, Invocation, Connction)---封装过程:Call.Id,Invocation--(查看Server.Connction.processData)

  • 相关阅读:
    每天一个linux命令(1):ls命令
    如何查看和停止Linux启动的服务
    JavaScript作用域原理——作用域根据函数划分
    iOS 自动布局详细介绍
    arc下内存泄漏的解决小技巧
    AFNetwork2.0在报错1016,3840的解决方法及一些感悟
    iOS聊天下拉刷新聊天记录的实现
    tableview直接滚动至最后一行
    UITabBar,UINavigationBar的布局和隐藏问题
    transformjs玩转星球
  • 原文地址:https://www.cnblogs.com/dorothychai/p/4224573.html
Copyright © 2011-2022 走看看