zoukankan      html  css  js  c++  java
  • Hadoop源码分析10: IPC流程(5) Atomic

    1.Class类的running 

    用于主线程的Client  和若干 ClientConnection 线程之间 共享Client的running变量

    public classClient {
      public AtomicBoolean running = newAtomicBoolean(true); // if client runs

     public void stop() {
      ...............
        if(!running.compareAndSet(true, false)) {
         return;
        }
     .................
     }

      private ClientConnectiongetConnection(ClientConnectionId remoteId,
                                  ClientCallcall)
                                  throwsIOException, InterruptedException {
        if(!running.get()) {
          //the client is stopped
          thrownew IOException("The client is stopped");
        }

    }


    public class ClientConnection extends Thread {
        private synchronizedboolean waitForWork() {
          if(calls.isEmpty() && !shouldCloseConnection.get() && client.running.get()) {
           long timeout = maxIdleTime-
               (System.currentTimeMillis()-lastActivity.get());
           if (timeout>0) {
             try {
              wait(timeout);
             } catch (InterruptedExceptione) {}
           }
         }
        if (!calls.isEmpty()&& !shouldCloseConnection.get() && client.running.get()) {
           return true;
          } 
        。。。。。。。。。。。。。。
    }

    public class ClientConnectionPingInputStream extendsFilterInputStream {

        private voidhandleTimeout(SocketTimeoutException e) throws IOException{
          if(clientConnection.shouldCloseConnection.get() ||!clientConnection.client.running.get()  ||clientConnection.rpcTimeout > 0) {
           throw e;
          }else {
           sendPing();
         }
        }
    。。。。。。
    }

    2. ClientConnectionlastActivity

    用于若干ClientConnection线程之间共享lastActivity

    public class ClientConnection extends Thread {

        private AtomicLonglastActivity = new AtomicLong();// last I/O activity time
        
       
        private voidtouch() {
         lastActivity.set(System.currentTimeMillis());
        }
     
       private synchronizedboolean waitForWork() {
          if(calls.isEmpty() && !shouldCloseConnection.get() && client.running.get()) {
           long timeout = maxIdleTime-
               (System.currentTimeMillis()-lastActivity.get());
           if (timeout>0) {
             try {
              wait(timeout);
             } catch (InterruptedExceptione) {}
           }
         }
        }
    }

    3. ClientConnection shouldCloseConnection 

    用于若干ClientConnection 线程之间共享shouldCloseConnection 

    public class ClientConnection extends Thread {

        AtomicBooleanshouldCloseConnection = new AtomicBoolean(); 
       // indicate if theconnection is closed

        public synchronizedboolean addCall(ClientCall call) {
          if(shouldCloseConnection.get())
           return false;
         calls.put(call.id, call);
         notify();
         return true;
        }

       public synchronized voidsetupIOstreams() throws InterruptedException {
          if(socket != null || shouldCloseConnection.get()) {
           return;
         }
        .........
       }

        private synchronizedboolean waitForWork() {
          if(calls.isEmpty() && !shouldCloseConnection.get() && client.running.get()) {
           long timeout = maxIdleTime-
               (System.currentTimeMillis()-lastActivity.get());
           if (timeout>0) {
             try {
              wait(timeout);
             } catch (InterruptedExceptione) {}
           }
         }
         
          if(!calls.isEmpty() && !shouldCloseConnection.get()&& client.running.get()) {
           return true;
          }else if (shouldCloseConnection.get()) {
           return false;
          }else if (calls.isEmpty()) { // idle connection closed orstopped
           markClosed(null);
           return false;
          }else { // get stopped but there are still pendingrequests 
           markClosed((IOException)newIOException().initCause(
               newInterruptedException()));
           return false;
         }
        }

       public voidsendParam(ClientCall call) {
          if(shouldCloseConnection.get()) {
           return;
         }
       }

        private voidreceiveResponse() {
          if(shouldCloseConnection.get()) {
           return;
         }
         touch();
       }
       
        private synchronizedvoid markClosed(IOException e) {
          if(shouldCloseConnection.compareAndSet(false, true)) {
           closeException = e;
           notifyAll();
         }
        }
        
       
        private synchronizedvoid close() {
          if(!shouldCloseConnection.get()) {
            return;
         }
        }
    }


    publicclass ClientConnectionPingInputStream extendsFilterInputStream {

        privatevoid handleTimeout(SocketTimeoutExceptione) throws IOException {
          if(clientConnection.shouldCloseConnection.get() ||!clientConnection.client.running.get()  ||clientConnection.rpcTimeout > 0) {
           throw e;
          }else {
           sendPing();
         }
        }
    。。。。。。
    }
  • 相关阅读:
    Python3标准库:fnmatch UNIX式glob模式匹配
    Python3标准库:glob文件名模式匹配
    Python3标准库:pathlib文件系统路径作为对象
    Python3标准库:os.path平台独立的文件名管理
    Python3标准库:statistics统计计算
    36-Docker 的两类存储资源
    第四章-操作列表
    35-外部世界如何访问容器?
    34-容器如何访问外部世界?
    33-容器间通信的三种方式
  • 原文地址:https://www.cnblogs.com/leeeee/p/7276527.html
Copyright © 2011-2022 走看看