zoukankan      html  css  js  c++  java
  • kafka.network.AbstractServerThread中的线程协作机制

    这个虚类是kafka.network.Acceptor和kafka.network.Processor的父类,提供了一个抽象的Sever线程。

    它的有趣之处在于为子类的启动和停止提供了线程间的协作机制。

    当子类的shutdown方法被调用时,子类可以得知自己被停止,在子类做了适当的处理和清理后,调用自己的shutdownComplete方法,使得对子类shutdown方法的调用从阻塞状态返回,从而使调用线程得知子类的对象已经恰当的停止。

    即,在另一个线程中要关闭一个AbstractServerThread,可以执行它shutdown方法,当此方法从阻塞中返回,代表它已经恰当的关闭。

    同样,对子类的awaitStartup方法调用也会阻塞,直到子类确认自己完全启动,这个方法调用才会返回。

    这些功能是通过对CountdownLatch和AtomicBoolean的使用来实现的。

    private[kafka] abstract class AbstractServerThread extends Runnable with Logging {
    
      protected val selector = Selector.open();
      private val startupLatch = new CountDownLatch(1)
      private val shutdownLatch = new CountDownLatch(1)
      private val alive = new AtomicBoolean(false)
    
      /**
       * Initiates a graceful shutdown by signaling to stop and waiting for the shutdown to complete
       */
      def shutdown(): Unit = {
        alive.set(false)
        selector.wakeup()
        shutdownLatch.await
      }
    
      /**
       * Wait for the thread to completely start up
       */
      def awaitStartup(): Unit = startupLatch.await
    
      /**
       * Record that the thread startup is complete
       */
      protected def startupComplete() = {
        alive.set(true)
        startupLatch.countDown
      }
    
      /**
       * Record that the thread shutdown is complete
       */
      protected def shutdownComplete() = shutdownLatch.countDown
    
      /**
       * Is the server still running?
       */
      protected def isRunning = alive.get
      
      /**
       * Wakeup the thread for selection.
       */
      def wakeup() = selector.wakeup()
      
    }
    

    由于它代表了一个Server线程,在其内部使用了java.nio的Selector。所以在shutdown时,需要调用Selector的wakeup方法,使得对Selector的select方法的调用从阻塞中返回。 

    继承它的子类必须对isRunning进行判断,来确定自己是否已经被要求关闭。以及在处理关闭请求后,调用shutdownComplete()来确认已完闭完成。

    由于Acceptor和Processor的实现太长,这里写了一个例子模拟它们

    private class Processor extends AbstractServerThread {
      override def run() {
        while(isRunning) {
          println("processor is running")
          //执行一些操作
          Thread.sleep(1000)
        }
        shutdownComplete()
      }
    
    }
    

      在工作循环中判断isRunning作为退出循环的条件。然后执行shutdownComplete, 这时对Processor 的shutdown方法的调用才会返回。

  • 相关阅读:
    数论算法 剩余系相关 学习笔记 (基础回顾,(ex)CRT,(ex)lucas,(ex)BSGS,原根与指标入门,高次剩余,Miller_Rabin+Pollard_Rho)
    51Nod1123 X^A Mod B 数论 中国剩余定理 原根 BSGS
    BZOJ2219 数论之神 数论 中国剩余定理 原根 BSGS
    BZOJ3583 杰杰的女性朋友 矩阵
    BZOJ2821 作诗(Poetize) 主席树 bitset
    BZOJ2178 圆的面积并 计算几何 辛普森积分
    BZOJ1058 [ZJOI2007]报表统计 set
    BZOJ2480 Spoj3105 Mod 数论 扩展BSGS
    BZOJ1095 [ZJOI2007]Hide 捉迷藏 动态点分治 堆
    AtCoder Regular Contest 101 (ARC101) D
  • 原文地址:https://www.cnblogs.com/devos/p/3750565.html
Copyright © 2011-2022 走看看