zoukankan      html  css  js  c++  java
  • Hadoop源码分析37 RPC的线程协作

    一、服务器端线程


    1、主线程main,启动其余线程后不参与交互


    21Listener线程,监听连接请求,并在有连接的时候唤醒Reader线程 

    public void run() {

         while(running){

           selector.select();

           IteratorSelectionKeyiter = selector.selectedKeys().iterator();

           while(iter.hasNext()){

               SelectionKey key = iter.next();

               iter.remove();

               if(key.isValid()&&key.isAcceptable()){

                     doAccept(key);

               } 

             }

           }  

         }

       }

     

       void doAccept(SelectionKey key) {

         ServerSocketChannel server = (ServerSocketChannel)key.channel();

         SocketChannel channel;

         while((channel =server.accept()) != null){

           channel.configureBlocking(false);

           channel.socket().setTcpNoDelay(tcpNoDelay);

           Reader reader = getReader();

           try{

             reader.startAdd();

             SelectionKey readKey = reader.registerChannel(channel);

             Connection c =new Connection(readKey, channel,System.currentTimeMillis());

             readKey.attach(c);

             synchronized(connectionList){

               connectionList.add(numConnections,c);

               numConnections++;

             }

           }finally{

             reader.finishAdd();

           }

         }

       }

    其中Reader线程有如下方法

         public void startAdd() {

           adding=true;

           readSelector.wakeup();

         }

     

         public synchronized SelectionKey registerChannel(SocketChannelchannel){

             return channel.register(readSelector,SelectionKey.OP_READ);

         }

         public synchronized void finishAdd() {

           adding=false;

           this.notify();       

         }

    即先将Reader线程的readSelectorselect()的阻塞中唤醒,再为readSelector注册READ操作,再从线程池中唤醒一个线程来获得锁Reader.this


     


    3若干个Reader线程,读取连接请求的数据,整个运行过程中加锁,锁为Reader.this
     

         public void run() {

           synchronized(this) {

             while(running){ 

                 readSelector.select();

                 while(adding){

                   this.wait(1000);

                 }        

                 IteratorSelectionKeyiter = readSelector.selectedKeys().iterator();

                 while(iter.hasNext()){

                   SelectionKey key = iter.next();

                   iter.remove();

                   if(key.isValid() && key.isReadable()) {

                       doRead(key);

                   } 

                 }    

             }

           }

         }
     

    readSelector被唤醒后,若addingtrueListener线程正在正在注册READ事件,则先wait()一会,addingfalse,被Listener线程notify(),获得锁Reader.this得以继续往下运行。

       void doRead(SelectionKey key)   {      

         Connection c =(Connection)key.attachment();       

         count = c.readAndProcess();          

       }  

       public int readAndProcess(){

         while(true){

           int count =channelRead(channel,data);        

           processOneRpc(data.array());

          } 

       }

       private void processOneRpc(byte[] buf) {

         if(headerRead){

           processData(buf);

         }

       }

       private void processData(byte[] buf) {

         DataInputStream dis =

           new DataInputStream(new ByteArrayInputStream(buf));

         intid =dis.readInt(); 

     

         Writable param =ReflectionUtils.newInstance(paramClass,conf);

         param.readFields(dis);    

           

         Call call =new Call(id, param,this);

         callQueue.put(call);             //queue the call; maybe blocked here

         incRpcCount(); // Increment therpc count

       }

    Reader线程读取完连接的数据之后,将其封装成一个Call对象,放入容器callQueue(一个阻塞队列LinkedBlockingQueue)


    4若干个Handler线程,处理连接Call,在处理结果时候加锁, 锁为responseQueue (一个普通LinkedList)
     

    public void run() {

         while(running){

             final Call call =callQueue.take();  

             Writable value =call(call.connection.protocol,call.param,

                              call.timestamp);

             synchronized(call.connection.responseQueue){

               setupResponse(buf, call,

                           (error==null)? Status.SUCCESS:Status.ERROR,

                           value, errorClass, error); 

               responder.doRespond(call);

             }

         }

       }

     

     void doRespond(Call call) {

         synchronized(call.connection.responseQueue){

           call.connection.responseQueue.addLast(call);

           if(call.connection.responseQueue.size()== 1) {

             processResponse(call.connection.responseQueue,true);

           }

         }

       }

     

    private boolean processResponse(LinkedListCall> responseQueue,

                                       boolean inHandler)  { 

           synchronized(responseQueue) { 

             Call call = responseQueue.removeFirst();

             SocketChannel channel = call.connection.channel; 

             channelWrite(channel, call.response); 

             if(call.response.hasRemaining()){  

               call.connection.responseQueue.addFirst(call);           

               if(inHandler) {

                 incPending();    

                 writeSelector.wakeup();

                 channel.register(writeSelector,SelectionKey.OP_WRITE,call);

                 decPending();

               }

             }

           }

         }  

       }

     

       private synchronized void incPending(){  // call waitingto be enqueued.

         pending++;

       }

     

       private synchronized void decPending() { // call doneenqueueing.

         pending--;

         notify();

       }

     

    responseQueue只有一个元素时,Handler线程自己处理结果,若大于1,则让唤醒Response线程的writeSelectorwriteSelector注册OP_WRITE事件


    51Responder线程,处理结果responseQueue并且清理超时的Response.锁为responseQueue(一个普通LinkedList)
     

      public void run() {

         while(running){

             waitPending();    // Ifa channel is being registered, wait.

             writeSelector.select(PURGE_INTERVAL);

             IteratorSelectionKeyiter = writeSelector.selectedKeys().iterator();

             while(iter.hasNext()){

               SelectionKey key = iter.next();

               iter.remove();

               if(key.isValid() &&key.isWritable()) {

                     doAsyncWrite(key);

               }

             } 

     

             ......................

             long now =System.currentTimeMillis();

             if(nowlastPurgeTime +PURGE_INTERVAL){

               continue;

             }

             lastPurgeTime = now;

             ArrayListCallcalls;

             

             //get the list of channels from list of keys.

             synchronized(writeSelector.keys()){

               calls =new ArrayListCall(writeSelector.keys().size());

               iter =writeSelector.keys().iterator();

               while(iter.hasNext()){

                 SelectionKey key = iter.next();

                 Call call = (Call)key.attachment();

                 if(call !=null && key.channel() ==call.connection.channel){

                   calls.add(call);

                 }

               }

             }

             

             for(Call call : calls) {

                 doPurge(call, now);

             }

         }

       }

       private synchronized void waitPending(){

         while(pending0){

           wait();

         }

       }

       private void doAsyncWrite(SelectionKeykey) throws IOException{

         Call call = (Call)key.attachment();

          

         synchronized(call.connection.responseQueue){

           if(processResponse(call.connection.responseQueue,false)){

                key.interestOps(0);          

           }

         }

       }

     

    private boolean processResponse(LinkedListCallresponseQueue,

                                       boolean inHandler)  { 

           synchronized(responseQueue) { 

             Call call = responseQueue.removeFirst();

             SocketChannel channel = call.connection.channel; 

             channelWrite(channel, call.response); 

             if(call.response.hasRemaining()){  

               call.connection.responseQueue.addFirst(call);           

               if(inHandler) {

                 incPending();    

                 writeSelector.wakeup();

                 channel.register(writeSelector, SelectionKey.OP_WRITE, call);

                 decPending();

               }

             }

           }

         }  

       }

     

       private void doPurge(Call call, long now)  {

         LinkedListCallresponseQueue = call.connection.responseQueue;

         synchronized(responseQueue){

           IteratorCalliter =responseQueue.listIterator(0);

           while(iter.hasNext()){

             call = iter.next();

             if(nowcall.timestamp+PURGE_INTERVAL){

               closeConnection(call.connection);

               break;

             }

           }

         }

       }


    总结:Listener 通过 ReadSelector注册OP_READ 唤醒Reader, Reader 通过将Call加入阻塞队列callQueue 唤醒Handler, Handler检查responseQueue通过WriterSelector注册OP_WRITE唤醒Responser



    二、客户端线程


    1、主线程main,锁为请求的封装对象call

      public Writable call(Writable param, ConnectionId remoteId)  {

       Call call =new Call(param);

       Connection connection =getConnection(remoteId,call);

       connection.sendParam(call);                //send the parameter

       boolean interrupted = false;

       synchronized(call){

         while(!call.done){

           call.wait();                          //wait for the result

         }

         return call.value;

       }

      }

      private Connection getConnection(ConnectionIdremoteId,

                                      Call call)  {

       Connection connection;

       do{

         synchronized(connections){

           connection =connections.get(remoteId);

           if(connection ==null){

             connection =new Connection(remoteId);

             connections.put(remoteId,connection);

           }

         }

       } while(!connection.addCall(call));    

       connection.setupIOstreams();

       return connection;

      }

    以下方法的锁为Connection.thisnotify()唤醒一个线程来获取释放的锁Connection.this

       private synchronized boolean addCall(Call call) {

         if(shouldCloseConnection.get())

           return false;

         calls.put(call.id,call);

         notify();

         return true;

       }


    2.Connection线程,锁为Connection.this

       public void run(){ 

         while(waitForWork()) {

           receiveResponse();

         }     

         close();

       }

       private synchronized boolean waitForWork() {

         if(calls.isEmpty()&& !shouldCloseConnection.get()&&running.get()) {

           long timeout =maxIdleTime-

                 (System.currentTimeMillis()-lastActivity.get());

           if(timeout0) {

               wait(timeout);

           }

         }    

         if(!calls.isEmpty()&& !shouldCloseConnection.get()&&running.get()){

           return true;

         }else if(shouldCloseConnection.get()){

           return false;

         }else if(calls.isEmpty()){// idleconnection closed or stopped

           markClosed(null);

           return false;

         }else{// get stoppedbut there are still pending requests

           markClosed((IOException)new IOException().initCause(

               new InterruptedException()));

           return false;

         }

       }

      可见calls不为空,则不再继续wait()

       private void receiveResponse(){

           if(shouldCloseConnection.get()){

             return;

          }

           touch(); 

           int id =in.readInt(); 

           Call call =calls.get(id);

           int state =in.readInt();     

           if(state ==Status.SUCCESS.state){

             Writable value =ReflectionUtils.newInstance(valueClass,conf);

             value.readFields(in);                //read value

             call.setValue(value);

             calls.remove(id);

           }else if(state ==Status.ERROR.state){

           }else if(state ==Status.FATAL.state){

           }

       }

     

    readInt() 最后会调用 PingInputStream.read(),这是一个阻塞方法,当输入流读不到数据时,会一直阻塞直到超时。

       public int read(byte[] buf, int off, int len) {

           do{

             try{

               return super.read(buf, off, len);

             }catch(SocketTimeoutException e) {

               handleTimeout(e);

             }

           }while(true);

         }

       }

     

    以下两个方法的锁为Call.this

       public synchronized void setValue(Writable value) {

         this.value= value;

         callComplete();

       }

       protected synchronized void callComplete(){

         this.done=true;

         notify();                                //notify caller

       }

    可见Connection线程在调用方法call.setValue时,会将call.done设置为true并唤醒主线程在获得锁Call.this从而使得主线程可以进一步运行下去

     

    总结:Main通过将Call加入calls,唤醒Connection,Connection通过读取Call的结果,再唤醒Main


  • 相关阅读:
    Android 按键消息处理Android 按键消息处理
    objcopy
    SQLite多线程读写实践及常见问题总结
    android动画坐标定义
    Android动画效果translate、scale、alpha、rotate
    Android公共库(缓存 下拉ListView 下载管理Pro 静默安装 root运行 Java公共类)
    Flatten Binary Tree to Linked List
    Distinct Subsequences
    Populating Next Right Pointers in Each Node II
    Populating Next Right Pointers in Each Node
  • 原文地址:https://www.cnblogs.com/leeeee/p/7276469.html
Copyright © 2011-2022 走看看