zoukankan      html  css  js  c++  java
  • Hadoop源码分析13: IPC流程(8) Server的wait、notify

    1.Serverwaitnotify

     

    public abstractclass Server {

     

          public synchronized void join()throws InterruptedException {

                 while (running){

                        wait();

                 }

          }     

     

          

          public synchronized void stop(){

                 LOG.info("Stopping server on" + port);

                 running =false;

                if (handlers !=null) {

                        for (int i = 0; i handlerCount; i++) {

                              if (handlers[i]!= null) {

                                     handlers[i].interrupt();

                              }

                        }

                 }

                 listener.interrupt();

                 listener.doStop();

                 responder.interrupt();

                 notifyAll();

          }

     

    }

     

    2. ServerListenerReaderwaitnotify

     

    publicclass ServerListenerReader implements Runnable{

     

       public void run(){

         synchronized (this) {

           while(serverListener.server.running) {

            SelectionKey key = null;

            try {

             readSelector.select();

              while (adding) {

               this.wait(1000);

              }            

     

             IteratorSelectionKey iter =readSelector.selectedKeys().iterator();

              while (iter.hasNext()){

                key =iter.next();

               iter.remove();

                if(key.isValid()) {

                 if (key.isReadable()) {

                  serverListener.doRead(key);

                 }

               }

                key =null;

              }

            } catch (InterruptedException e) {

              if(serverListener.server.running) {                  // unexpected -- log it

              }

            } catch (IOException ex) {

             }

          }

         }

       }

     

     

       public synchronized void finishAdd(){

         adding = false;

         this.notify();       

       }

    }

     

     

    3. ServerResponderwaitnotify

     

    publicclass ServerResponder extends Thread {

     

       public void run(){

             Server.SERVER.set(server);

         long lastPurgeTime = 0;  // last check for old calls.

     

         while (server.running){

           try{

           waitPending();    // If a channel is being registered,wait.

            writeSelector.select(PURGE_INTERVAL);

            IteratorSelectionKeyiter = writeSelector.selectedKeys().iterator();

            while (iter.hasNext()) {

              SelectionKey key =iter.next();

              iter.remove();

              try {

                if(key.isValid() && key.isWritable()) {

                   doAsyncWrite(key);

               }

              } catch (IOException e){

              }

            }

            long now =System.currentTimeMillis();

            if (now lastPurgeTime +PURGE_INTERVAL) {

              continue;

            }

            lastPurgeTime = now;

            //

            // If there were some calls that have not beensent out for a

            // long time, discard them.

            //

            ArrayListServerCallcalls;

            

            // get the list of channels from list ofkeys.

            synchronized (writeSelector.keys()) {

              calls = newArrayListServerCall(writeSelector.keys().size());

              iter =writeSelector.keys().iterator();

              while (iter.hasNext()){

               SelectionKey key = iter.next();

                ServerCallcall = (ServerCall)key.attachment();

                if (call!= null && key.channel() == call.connection.channel){ 

                 calls.add(call);

               }

              }

            }

            

            for(ServerCall call : calls) {

              try {

               doPurge(call, now);

              } catch (IOException e){

              }

            }

           } catch(OutOfMemoryError e) {

            //

            // we can run out of memory if we have too manythreads

            // log the event and sleep for a minute andgive

            // some thread(s) a chance to finish

            //

             try { Thread.sleep(60000); }catch (Exception ie) {}

           } catch(Exception e) {

           }

         }

        }

     

     

       private synchronized void waitPending()throws InterruptedException {

         while (pending0) {

          wait();

         }

       }

       private synchronized void decPending(){ // call done enqueueing.

         pending--;

         notify();

       }

     

    }

  • 相关阅读:
    【转】Hibernate入门实例
    【J2EE】Java连接SQL Server 2000问题:“com.microsoft.sqlserver.jdbc.SQLServerException:用户'sa'登录失败。该用户与可信SQL Server连接无关联”
    【转】Java JDBC连接SQL Server2005错误:通过端口 1433 连接到主机 localhost 的 TCP/IP 连接失败
    linux命令行下的ftp 多文件下载和目录下载(转)
    Ubuntu下部署java JDK和eclipse IDE
    构建第一个Spring Boot2.0应用之集成dubbo上---环境搭建(九)
    构建第一个Spring Boot2.0应用之集成mybatis(六)
    构建第一个Spring Boot2.0应用之Controller(三)
    linux修改系统时间为北京时间(CentOS)
    构建第一个Spring Boot2.0应用之application.properties和application.yml(八)
  • 原文地址:https://www.cnblogs.com/leeeee/p/7276522.html
Copyright © 2011-2022 走看看