zoukankan      html  css  js  c++  java
  • Hadoop源码分析37 RPC的线程协作

    一、服务器端线程


    1、主线程main,启动其余线程后不参与交互


    21Listener线程,监听连接请求,并在有连接的时候唤醒Reader线程 

    public void run() {

         while(running){

           selector.select();

           IteratorSelectionKeyiter = selector.selectedKeys().iterator();

           while(iter.hasNext()){

               SelectionKey key = iter.next();

               iter.remove();

               if(key.isValid()&&key.isAcceptable()){

                     doAccept(key);

               } 

             }

           }  

         }

       }

     

       void doAccept(SelectionKey key) {

         ServerSocketChannel server = (ServerSocketChannel)key.channel();

         SocketChannel channel;

         while((channel =server.accept()) != null){

           channel.configureBlocking(false);

           channel.socket().setTcpNoDelay(tcpNoDelay);

           Reader reader = getReader();

           try{

             reader.startAdd();

             SelectionKey readKey = reader.registerChannel(channel);

             Connection c =new Connection(readKey, channel,System.currentTimeMillis());

             readKey.attach(c);

             synchronized(connectionList){

               connectionList.add(numConnections,c);

               numConnections++;

             }

           }finally{

             reader.finishAdd();

           }

         }

       }

    其中Reader线程有如下方法

         public void startAdd() {

           adding=true;

           readSelector.wakeup();

         }

     

         public synchronized SelectionKey registerChannel(SocketChannelchannel){

             return channel.register(readSelector,SelectionKey.OP_READ);

         }

         public synchronized void finishAdd() {

           adding=false;

           this.notify();       

         }

    即先将Reader线程的readSelectorselect()的阻塞中唤醒,再为readSelector注册READ操作,再从线程池中唤醒一个线程来获得锁Reader.this


     


    3若干个Reader线程,读取连接请求的数据,整个运行过程中加锁,锁为Reader.this
     

         public void run() {

           synchronized(this) {

             while(running){ 

                 readSelector.select();

                 while(adding){

                   this.wait(1000);

                 }        

                 IteratorSelectionKeyiter = readSelector.selectedKeys().iterator();

                 while(iter.hasNext()){

                   SelectionKey key = iter.next();

                   iter.remove();

                   if(key.isValid() && key.isReadable()) {

                       doRead(key);

                   } 

                 }    

             }

           }

         }
     

    readSelector被唤醒后,若addingtrueListener线程正在正在注册READ事件,则先wait()一会,addingfalse,被Listener线程notify(),获得锁Reader.this得以继续往下运行。

       void doRead(SelectionKey key)   {      

         Connection c =(Connection)key.attachment();       

         count = c.readAndProcess();          

       }  

       public int readAndProcess(){

         while(true){

           int count =channelRead(channel,data);        

           processOneRpc(data.array());

          } 

       }

       private void processOneRpc(byte[] buf) {

         if(headerRead){

           processData(buf);

         }

       }

       private void processData(byte[] buf) {

         DataInputStream dis =

           new DataInputStream(new ByteArrayInputStream(buf));

         intid =dis.readInt(); 

     

         Writable param =ReflectionUtils.newInstance(paramClass,conf);

         param.readFields(dis);    

           

         Call call =new Call(id, param,this);

         callQueue.put(call);             //queue the call; maybe blocked here

         incRpcCount(); // Increment therpc count

       }

    Reader线程读取完连接的数据之后,将其封装成一个Call对象,放入容器callQueue(一个阻塞队列LinkedBlockingQueue)


    4若干个Handler线程,处理连接Call,在处理结果时候加锁, 锁为responseQueue (一个普通LinkedList)
     

    public void run() {

         while(running){

             final Call call =callQueue.take();  

             Writable value =call(call.connection.protocol,call.param,

                              call.timestamp);

             synchronized(call.connection.responseQueue){

               setupResponse(buf, call,

                           (error==null)? Status.SUCCESS:Status.ERROR,

                           value, errorClass, error); 

               responder.doRespond(call);

             }

         }

       }

     

     void doRespond(Call call) {

         synchronized(call.connection.responseQueue){

           call.connection.responseQueue.addLast(call);

           if(call.connection.responseQueue.size()== 1) {

             processResponse(call.connection.responseQueue,true);

           }

         }

       }

     

    private boolean processResponse(LinkedListCall> responseQueue,

                                       boolean inHandler)  { 

           synchronized(responseQueue) { 

             Call call = responseQueue.removeFirst();

             SocketChannel channel = call.connection.channel; 

             channelWrite(channel, call.response); 

             if(call.response.hasRemaining()){  

               call.connection.responseQueue.addFirst(call);           

               if(inHandler) {

                 incPending();    

                 writeSelector.wakeup();

                 channel.register(writeSelector,SelectionKey.OP_WRITE,call);

                 decPending();

               }

             }

           }

         }  

       }

     

       private synchronized void incPending(){  // call waitingto be enqueued.

         pending++;

       }

     

       private synchronized void decPending() { // call doneenqueueing.

         pending--;

         notify();

       }

     

    responseQueue只有一个元素时,Handler线程自己处理结果,若大于1,则让唤醒Response线程的writeSelectorwriteSelector注册OP_WRITE事件


    51Responder线程,处理结果responseQueue并且清理超时的Response.锁为responseQueue(一个普通LinkedList)
     

      public void run() {

         while(running){

             waitPending();    // Ifa channel is being registered, wait.

             writeSelector.select(PURGE_INTERVAL);

             IteratorSelectionKeyiter = writeSelector.selectedKeys().iterator();

             while(iter.hasNext()){

               SelectionKey key = iter.next();

               iter.remove();

               if(key.isValid() &&key.isWritable()) {

                     doAsyncWrite(key);

               }

             } 

     

             ......................

             long now =System.currentTimeMillis();

             if(nowlastPurgeTime +PURGE_INTERVAL){

               continue;

             }

             lastPurgeTime = now;

             ArrayListCallcalls;

             

             //get the list of channels from list of keys.

             synchronized(writeSelector.keys()){

               calls =new ArrayListCall(writeSelector.keys().size());

               iter =writeSelector.keys().iterator();

               while(iter.hasNext()){

                 SelectionKey key = iter.next();

                 Call call = (Call)key.attachment();

                 if(call !=null && key.channel() ==call.connection.channel){

                   calls.add(call);

                 }

               }

             }

             

             for(Call call : calls) {

                 doPurge(call, now);

             }

         }

       }

       private synchronized void waitPending(){

         while(pending0){

           wait();

         }

       }

       private void doAsyncWrite(SelectionKeykey) throws IOException{

         Call call = (Call)key.attachment();

          

         synchronized(call.connection.responseQueue){

           if(processResponse(call.connection.responseQueue,false)){

                key.interestOps(0);          

           }

         }

       }

     

    private boolean processResponse(LinkedListCallresponseQueue,

                                       boolean inHandler)  { 

           synchronized(responseQueue) { 

             Call call = responseQueue.removeFirst();

             SocketChannel channel = call.connection.channel; 

             channelWrite(channel, call.response); 

             if(call.response.hasRemaining()){  

               call.connection.responseQueue.addFirst(call);           

               if(inHandler) {

                 incPending();    

                 writeSelector.wakeup();

                 channel.register(writeSelector, SelectionKey.OP_WRITE, call);

                 decPending();

               }

             }

           }

         }  

       }

     

       private void doPurge(Call call, long now)  {

         LinkedListCallresponseQueue = call.connection.responseQueue;

         synchronized(responseQueue){

           IteratorCalliter =responseQueue.listIterator(0);

           while(iter.hasNext()){

             call = iter.next();

             if(nowcall.timestamp+PURGE_INTERVAL){

               closeConnection(call.connection);

               break;

             }

           }

         }

       }


    总结:Listener 通过 ReadSelector注册OP_READ 唤醒Reader, Reader 通过将Call加入阻塞队列callQueue 唤醒Handler, Handler检查responseQueue通过WriterSelector注册OP_WRITE唤醒Responser



    二、客户端线程


    1、主线程main,锁为请求的封装对象call

      public Writable call(Writable param, ConnectionId remoteId)  {

       Call call =new Call(param);

       Connection connection =getConnection(remoteId,call);

       connection.sendParam(call);                //send the parameter

       boolean interrupted = false;

       synchronized(call){

         while(!call.done){

           call.wait();                          //wait for the result

         }

         return call.value;

       }

      }

      private Connection getConnection(ConnectionIdremoteId,

                                      Call call)  {

       Connection connection;

       do{

         synchronized(connections){

           connection =connections.get(remoteId);

           if(connection ==null){

             connection =new Connection(remoteId);

             connections.put(remoteId,connection);

           }

         }

       } while(!connection.addCall(call));    

       connection.setupIOstreams();

       return connection;

      }

    以下方法的锁为Connection.thisnotify()唤醒一个线程来获取释放的锁Connection.this

       private synchronized boolean addCall(Call call) {

         if(shouldCloseConnection.get())

           return false;

         calls.put(call.id,call);

         notify();

         return true;

       }


    2.Connection线程,锁为Connection.this

       public void run(){ 

         while(waitForWork()) {

           receiveResponse();

         }     

         close();

       }

       private synchronized boolean waitForWork() {

         if(calls.isEmpty()&& !shouldCloseConnection.get()&&running.get()) {

           long timeout =maxIdleTime-

                 (System.currentTimeMillis()-lastActivity.get());

           if(timeout0) {

               wait(timeout);

           }

         }    

         if(!calls.isEmpty()&& !shouldCloseConnection.get()&&running.get()){

           return true;

         }else if(shouldCloseConnection.get()){

           return false;

         }else if(calls.isEmpty()){// idleconnection closed or stopped

           markClosed(null);

           return false;

         }else{// get stoppedbut there are still pending requests

           markClosed((IOException)new IOException().initCause(

               new InterruptedException()));

           return false;

         }

       }

      可见calls不为空,则不再继续wait()

       private void receiveResponse(){

           if(shouldCloseConnection.get()){

             return;

          }

           touch(); 

           int id =in.readInt(); 

           Call call =calls.get(id);

           int state =in.readInt();     

           if(state ==Status.SUCCESS.state){

             Writable value =ReflectionUtils.newInstance(valueClass,conf);

             value.readFields(in);                //read value

             call.setValue(value);

             calls.remove(id);

           }else if(state ==Status.ERROR.state){

           }else if(state ==Status.FATAL.state){

           }

       }

     

    readInt() 最后会调用 PingInputStream.read(),这是一个阻塞方法,当输入流读不到数据时,会一直阻塞直到超时。

       public int read(byte[] buf, int off, int len) {

           do{

             try{

               return super.read(buf, off, len);

             }catch(SocketTimeoutException e) {

               handleTimeout(e);

             }

           }while(true);

         }

       }

     

    以下两个方法的锁为Call.this

       public synchronized void setValue(Writable value) {

         this.value= value;

         callComplete();

       }

       protected synchronized void callComplete(){

         this.done=true;

         notify();                                //notify caller

       }

    可见Connection线程在调用方法call.setValue时,会将call.done设置为true并唤醒主线程在获得锁Call.this从而使得主线程可以进一步运行下去

     

    总结:Main通过将Call加入calls,唤醒Connection,Connection通过读取Call的结果,再唤醒Main


  • 相关阅读:
    Linux下查看CPU型号,内存大小,硬盘空间的命令
    java_opts 参数与JVM内存调优
    less 查看日志
    如何实时查看Linux下日志
    mysql的sql语句的性能诊断分析
    使用zabbix-java-gateway可以通过该网关来监听多个JVM
    性能瓶颈分析
    Git客户端的安装与配置入门
    渗透测试的8个步骤 展现一次完整的渗透测试过程及思路
    Python&selenium&tesseract自动化测试随机码、验证码(Captcha)的OCR识别解决方案参考
  • 原文地址:https://www.cnblogs.com/leeeee/p/7276469.html
Copyright © 2011-2022 走看看