zoukankan      html  css  js  c++  java
  • Kafka网络通信-流程及源码分析

    1.Reactor模式

    一个线程监视一堆连接,同步等待一个或多个事件到来,然后将事件分发给对应的Handler处理

    2.Kafka网络通信模型

    引用源码(2.0.0)注释里的一段话:

    An NIO socket server. The threading model is 1 Acceptor thread that handles new connections
    一个NIOsocket server,线程模型是一个Acceptor线程来处理新的连接
    Acceptor has N Processor threads that each have their own selector and read requests from sockets
    Acceptor有N个Processor线程,每个Processor线程有它自己的选择器从sockets读取请求
    M Handler threads that handle requests and produce responses back to the processor threads for writing.
    有M个Handler线程处理请求并且返回响应给Processor线程
    

    简单来说,broker中有一个Acceptor线程监听新的连接并与之建立连接,建立之后会选择一个Processor管理该连接,当有事件到达时,Processor会将其封装成Request请求,放入共享请求队列,Handler线程池从队列中取出请求,进行真正的处理,处理完成之后,将响应发送到Processor的响应队列,Processor将响应返回给客户端

    上面的N和M分别对应num.network.threads和num.io.threads,默认为3和8

    Kafka2.5.0里面将请求类型分为两类:

    数据类请求和控制类请求,因为想让控制类请求优先被处理,只有一个Processor线程

    /**
     * Handles new connections, requests and responses to and from broker.
     * Kafka supports two types of request planes :
     *  - data-plane :
     *    处理来自客户端和集群中其他broker的请求
     *    - Handles requests from clients and other brokers in the cluster.
     *    线程模型为
     *    - The threading model is
     *    每个监听器有一个Acceptor线程,处理新连接
     *      1 Acceptor thread per listener, that handles new connections.
     *      可以配置多个planes,通过设置逗号分隔的列表
     *      It is possible to configure multiple data-planes by specifying multiple "," separated endpoints for "listeners" in KafkaConfig.
     *      Acceptor有N个Processor线程,每个Processor线程有它自己的选择器并且从sockets读取请求
     *      Acceptor has N Processor threads that each have their own selector and read requests from sockets
     *      M个Handler线程处理请求并且返回响应给Processor线程
     *      M Handler threads that handle requests and produce responses back to the processor threads for writing.
     *  - control-plane :
     *    处理来自控制器的请求,这是可选的并且能够通过指定control.plane.listener.name来配置
     *    - Handles requests from controller. This is optional and can be configured by specifying "control.plane.listener.name".
     *    如果没有配置,控制器请求将通过data-plane来处理
     *      If not configured, the controller requests are handled by the data-plane.
     *      线程模型是
     *    - The threading model is
     *    1个Acceptor线程处理新连接
     *      1 Acceptor thread that handles new connections
     *      Acceptor有一个Processor线程,Processor线程有它自己的选择器并且从sockets读取请求
     *      Acceptor has 1 Processor thread that has its own selector and read requests from the socket.
     *      1个Handler线程处理请求并且返回响应给Processor线程
     *      1 Handler thread that handles requests and produce responses back to the processor thread for writing.
     */
    

    3.源码分析

    1.Kafka core模块

    network包里由两部分组成:RequestChannelSocketServer

    1.SocketServer

    SocketServer管理Processor线程,Acceptor线程,RequestChannel对象等

    class SocketServer(val config: KafkaConfig,
                       val metrics: Metrics,
                       val time: Time,
                       val credentialProvider: CredentialProvider)
      extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
    
      //请求队列长度,配置参数queued.max.requests,默认值为500
      private val maxQueuedRequests = config.queuedMaxRequests
    
      private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ")
      this.logIdent = logContext.logPrefix
    
      private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
      private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", MetricsGroup)
      private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup)
      memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
      private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
      // data-plane
      //处理数据类型请求的Processors线程池
      private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
      //处理数据类型请求的Acceptors线程池
      private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
      //处理数据类型请求的RequestChannel对象
      val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix)
      // control-plane
      //处理控制类型请求的Processor线程,一个线程
      private var controlPlaneProcessorOpt : Option[Processor] = None
      //处理控制类型请求的Acceptor线程,一个线程
      private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
      //处理控制类型请求的RequestChannel对象
      val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => new RequestChannel(20, ControlPlaneMetricPrefix))
    }
    

    2.RequestChannel

    添加删除Processor,中转Request和Response,代码如下:

    class RequestChannel(val queueSize: Int, val metricNamePrefix : String) extends KafkaMetricsGroup {
      import RequestChannel._
      val metrics = new RequestChannel.Metrics
      //共享请求阻塞队列,线程安全,queueSize大小为queued.max.requests,默认500,socketServer传入
      private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
      //Processors线程池
      private val processors = new ConcurrentHashMap[Int, Processor]()
      val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric)
      val responseQueueSizeMetricName = metricNamePrefix.concat(ResponseQueueSizeMetric)
    
      newGauge(requestQueueSizeMetricName, () => requestQueue.size)
    
      newGauge(responseQueueSizeMetricName, () => {
        processors.values.asScala.foldLeft(0) {(total, processor) =>
          total + processor.responseQueueSize
        }
      })
    
      //添加processor线程
      def addProcessor(processor: Processor): Unit = {
        if (processors.putIfAbsent(processor.id, processor) != null)
          warn(s"Unexpected processor with processorId ${processor.id}")
    
        newGauge(responseQueueSizeMetricName, () => processor.responseQueueSize,
          Map(ProcessorMetricTag -> processor.id.toString))
      }
    
      //删除processor线程
      def removeProcessor(processorId: Int): Unit = {
        processors.remove(processorId)
        removeMetric(responseQueueSizeMetricName, Map(ProcessorMetricTag -> processorId.toString))
      }
    
      /** Send a request to be handled, potentially blocking until there is room in the queue for the request */
      // 发送要处理的request请求,可能被阻塞直到队列中有足够的空间,将请求放入队列
      def sendRequest(request: RequestChannel.Request): Unit = {
        requestQueue.put(request)
      }
    
      /** Send a response back to the socket server to be sent over the network */
      //发送一个响应返回给socket服务器以通过网络发送
      def sendResponse(response: RequestChannel.Response): Unit = {
        if (isTraceEnabled) {
          val requestHeader = response.request.header
          val message = response match {
            case sendResponse: SendResponse =>
              s"Sending ${requestHeader.apiKey} response to client ${requestHeader.clientId} of ${sendResponse.responseSend.size} bytes."
            case _: NoOpResponse =>
              s"Not sending ${requestHeader.apiKey} response to client ${requestHeader.clientId} as it's not required."
            case _: CloseConnectionResponse =>
              s"Closing connection for client ${requestHeader.clientId} due to error during ${requestHeader.apiKey}."
            case _: StartThrottlingResponse =>
              s"Notifying channel throttling has started for client ${requestHeader.clientId} for ${requestHeader.apiKey}"
            case _: EndThrottlingResponse =>
              s"Notifying channel throttling has ended for client ${requestHeader.clientId} for ${requestHeader.apiKey}"
          }
          trace(message)
        }
    
        val processor = processors.get(response.processor)
        // The processor may be null if it was shutdown. In this case, the connections
        // are closed, so the response is dropped.
        //将响应放入对应的processor的响应队列中
        if (processor != null) {
          processor.enqueueResponse(response)
        }
      }
    
      /** Get the next request or block until specified time has elapsed */
      //获取下一个请求或者阻塞直到超时
      def receiveRequest(timeout: Long): RequestChannel.BaseRequest =
        requestQueue.poll(timeout, TimeUnit.MILLISECONDS)
    
      /** Get the next request or block until there is one */
      //获取下一个请求或者阻塞直到有一个值
      def receiveRequest(): RequestChannel.BaseRequest =
        requestQueue.take()
    
      def updateErrorMetrics(apiKey: ApiKeys, errors: collection.Map[Errors, Integer]): Unit = {
        errors.foreach { case (error, count) =>
          metrics(apiKey.name).markErrorMeter(error, count)
        }
      }
    
      def clear(): Unit = {
        requestQueue.clear()
      }
    
      def shutdown(): Unit = {
        clear()
        metrics.close()
      }
    
      def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest)
    
    }
    

    3.Acceptor

    获取accept事件,通过serverSocketChannel.accept()获取socketChannel对象,然后将其放入一个processor中,之后由它处理IO事件,源码如下:

    /**
     * Thread that accepts and configures new connections. There is one of these per endpoint.
     */
    private[kafka] class Acceptor(val endPoint: EndPoint,//host和port信息
                                  val sendBufferSize: Int,//socket.send.buffer.bytes,默认102400B,100KB
                                  val recvBufferSize: Int,//socket.receive.buffer.bytes。默认102400B,100KB
                                  brokerId: Int,
                                  connectionQuotas: ConnectionQuotas,
                                  metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
    
      //Selector
      private val nioSelector = NSelector.open()
      //Create a server socket to listen for connections on.创建一个ServerSocketChannel监听连接
      val serverChannel = openServerSocket(endPoint.host, endPoint.port)
      //创建一个数组保存Processor
      private val processors = new ArrayBuffer[Processor]()
      private val processorsStarted = new AtomicBoolean
      private val blockedPercentMeter = newMeter(s"${metricPrefix}AcceptorBlockedPercent",
        "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> endPoint.listenerName.value))
    
      private[network] def addProcessors(newProcessors: Buffer[Processor], processorThreadPrefix: String): Unit = synchronized {
        processors ++= newProcessors
        if (processorsStarted.get)
          startProcessors(newProcessors, processorThreadPrefix)
      }
    
      private[network] def startProcessors(processorThreadPrefix: String): Unit = synchronized {
        if (!processorsStarted.getAndSet(true)) {
          startProcessors(processors, processorThreadPrefix)
        }
      }
    
      private def startProcessors(processors: Seq[Processor], processorThreadPrefix: String): Unit = synchronized {
        processors.foreach { processor =>
          KafkaThread.nonDaemon(s"${processorThreadPrefix}-kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
            processor).start()
        }
      }
    
      private[network] def removeProcessors(removeCount: Int, requestChannel: RequestChannel): Unit = synchronized {
        // Shutdown `removeCount` processors. Remove them from the processor list first so that no more
        // connections are assigned. Shutdown the removed processors, closing the selector and its connections.
        // The processors are then removed from `requestChannel` and any pending responses to these processors are dropped.
        val toRemove = processors.takeRight(removeCount)
        processors.remove(processors.size - removeCount, removeCount)
        toRemove.foreach(_.shutdown())
        toRemove.foreach(processor => requestChannel.removeProcessor(processor.id))
      }
    
      override def shutdown(): Unit = {
        super.shutdown()
        synchronized {
          processors.foreach(_.shutdown())
        }
      }
    
      /**
       * Accept loop that checks for new connection attempts
       * 接受一个能检查新连接尝试的循环
       */
      def run(): Unit = {
        //选择Accept事件
        serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
        //Record that the thread startup is complete 等待Acceptor启动完成
        startupComplete()
        try {
          var currentProcessorIndex = 0
          while (isRunning) {
            try {
    
              //500ms获取一次
              val ready = nioSelector.select(500)
              //如果获取到事件
              if (ready > 0) {
                val keys = nioSelector.selectedKeys()
                val iter = keys.iterator()
                //遍历获取到的事件
                while (iter.hasNext && isRunning) {
                  try {
                    val key = iter.next
                    iter.remove()
    
                    if (key.isAcceptable) {
                      //调用accept方法获取SocketChannel对象
                      accept(key).foreach { socketChannel =>
    
                        // Assign the channel to the next processor (using round-robin) to which the
                        // channel can be added without blocking. If newConnections queue is full on
                        // all processors, block until the last one is able to accept a connection.
                        var retriesLeft = synchronized(processors.length)
                        var processor: Processor = null
                        do {
                          retriesLeft -= 1
                          //获取一个processor
                          processor = synchronized {
                            // adjust the index (if necessary) and retrieve the processor atomically for
                            // correct behaviour in case the number of processors is reduced dynamically
                            currentProcessorIndex = currentProcessorIndex % processors.length
                            processors(currentProcessorIndex)
                          }
                          currentProcessorIndex += 1
                          //将socketChannel放入processor中
                        } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
                      }
                    } else
                      throw new IllegalStateException("Unrecognized key state for acceptor thread.")
                  } catch {
                    case e: Throwable => error("Error while accepting connection", e)
                  }
                }
              }
            }
            catch {
              // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
              // to a select operation on a specific channel or a bad request. We don't want
              // the broker to stop responding to requests from other clients in these scenarios.
              case e: ControlThrowable => throw e
              case e: Throwable => error("Error occurred", e)
            }
          }
        } finally {
          debug("Closing server socket and selector.")
          CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
          CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
          shutdownComplete()
        }
      }
    
      /**
      * Create a server socket to listen for connections on.
      */
      private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
        val socketAddress =
          if (host == null || host.trim.isEmpty)
            new InetSocketAddress(port)
          else
            new InetSocketAddress(host, port)
        val serverChannel = ServerSocketChannel.open()
        serverChannel.configureBlocking(false)
        if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
          serverChannel.socket().setReceiveBufferSize(recvBufferSize)
    
        try {
          serverChannel.socket.bind(socketAddress)
          info(s"Awaiting socket connections on ${socketAddress.getHostString}:${serverChannel.socket.getLocalPort}.")
        } catch {
          case e: SocketException =>
            throw new KafkaException(s"Socket server failed to bind to ${socketAddress.getHostString}:$port: ${e.getMessage}.", e)
        }
        serverChannel
      }
    
      /**
       * Accept a new connection
       */
      private def accept(key: SelectionKey): Option[SocketChannel] = {
        val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
        //调用serverSocketChannel.accept()获取socketChannel对象
        val socketChannel = serverSocketChannel.accept()
        try {
          connectionQuotas.inc(endPoint.listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter)
          //设置非阻塞
          socketChannel.configureBlocking(false)
          //disable/enable Nagle's algorithm
          socketChannel.socket().setTcpNoDelay(true)
          //设置tcp保持连接
          socketChannel.socket().setKeepAlive(true)
          if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
            socketChannel.socket().setSendBufferSize(sendBufferSize)
          Some(socketChannel)
        } catch {
          case e: TooManyConnectionsException =>
            info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.")
            close(endPoint.listenerName, socketChannel)
            None
        }
      }
    
      private def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock: Boolean): Boolean = {
        if (processor.accept(socketChannel, mayBlock, blockedPercentMeter)) {
          debug(s"Accepted connection from ${socketChannel.socket.getRemoteSocketAddress} on" +
            s" ${socketChannel.socket.getLocalSocketAddress} and assigned it to processor ${processor.id}," +
            s" sendBufferSize [actual|requested]: [${socketChannel.socket.getSendBufferSize}|$sendBufferSize]" +
            s" recvBufferSize [actual|requested]: [${socketChannel.socket.getReceiveBufferSize}|$recvBufferSize]")
          true
        } else
          false
      }
    
      /**
       * Wakeup the thread for selection.
       */
      @Override
      def wakeup = nioSelector.wakeup()
    
    }
    

    4.Processor

    将I/O事件封装成Request存入阻塞队列,将I/O线程放入的Response返回给客户端,处理Response的回调,代码如下:

    /**
     * Thread that processes all requests from a single connection. There are N of these running in parallel
     * each of which has its own selector
     */
    private[kafka] class Processor(val id: Int,
                                   time: Time,
                                   maxRequestSize: Int,
                                   requestChannel: RequestChannel,
                                   connectionQuotas: ConnectionQuotas,
                                   connectionsMaxIdleMs: Long,
                                   failedAuthenticationDelayMs: Int,
                                   listenerName: ListenerName,
                                   securityProtocol: SecurityProtocol,
                                   config: KafkaConfig,
                                   metrics: Metrics,
                                   credentialProvider: CredentialProvider,
                                   memoryPool: MemoryPool,
                                   logContext: LogContext,
                                   connectionQueueSize: Int = ConnectionQueueSize) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
    
      private object ConnectionId {
        def fromString(s: String): Option[ConnectionId] = s.split("-") match {
          case Array(local, remote, index) => BrokerEndPoint.parseHostPort(local).flatMap { case (localHost, localPort) =>
            BrokerEndPoint.parseHostPort(remote).map { case (remoteHost, remotePort) =>
              ConnectionId(localHost, localPort, remoteHost, remotePort, Integer.parseInt(index))
            }
          }
          case _ => None
        }
      }
    
      private[network] case class ConnectionId(localHost: String, localPort: Int, remoteHost: String, remotePort: Int, index: Int) {
        override def toString: String = s"$localHost:$localPort-$remoteHost:$remotePort-$index"
      }
    
      //存放Acceptor得到的SocketChannel的阻塞队列
      private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
      //存放发送的response,之后做一些remove等操作
      private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
      //存放IO线程处理完之后的response
      private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
      ...
      ...
      override def run(): Unit = {
        startupComplete()
        try {
          while (isRunning) {
            try {
              // setup any new connections that have been queued up
              //Register any new connections that have been queued up注册newConnections里面所有的channel
              //selector.register(connectionId(channel.socket), channel)注册到selector中
              configureNewConnections()
              // register any new responses for writing
              //发送response,放入inflightResponses
              processNewResponses()
              //获取socketChannel上就绪的I/O事件
              poll()
              //将接收到的request放入阻塞队列中
              processCompletedReceives()
              //对inflightResponses中成功发送的response执行回调
              processCompletedSends()
              //将inflightResponses中断开响应的连接移除
              processDisconnected()
              //关闭低优先级的连接
              closeExcessConnections()
            } catch {
              // We catch all the throwables here to prevent the processor thread from exiting. We do this because
              // letting a processor exit might cause a bigger impact on the broker. This behavior might need to be
              // reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would
              // be either associated with a specific socket channel or a bad request. These exceptions are caught and
              // processed by the individual methods above which close the failing channel and continue processing other
              // channels. So this catch block should only ever see ControlThrowables.
              case e: Throwable => processException("Processor got uncaught exception.", e)
            }
          }
        } finally {
          debug(s"Closing selector - processor $id")
          CoreUtils.swallow(closeAll(), this, Level.ERROR)
          shutdownComplete()
        }
      }
    }
    

    2.KafkaRequestHandlerPool

    1.KafkaRequestHandlerPool

    IO线程池,实际处理请求的线程池,源码如下:

    class KafkaRequestHandlerPool(val brokerId: Int,
                                  val requestChannel: RequestChannel, //socketServer里面的requestChannel对象
                                  val apis: KafkaApis, //处理各种kafka请求的逻辑
                                  time: Time,
                                  numThreads: Int, //I/O线程数量
                                  requestHandlerAvgIdleMetricName: String,
                                  logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {
    
      //resizeThreadPool()方法里动态扩容
      private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
      /* a meter to track the average free capacity of the request handlers */
      private val aggregateIdleMeter = newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS)
    
      this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on Broker " + brokerId + "], "
      //新建一个线程数组
      val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
      for (i <- 0 until numThreads) {
        createHandler(i)
      }
    
      //创建I/O线程,KafkaRequestHandler对象
      def createHandler(id: Int): Unit = synchronized {
        runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
        KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
      }
    
      def resizeThreadPool(newSize: Int): Unit = synchronized {
        val currentSize = threadPoolSize.get
        info(s"Resizing request handler thread pool size from $currentSize to $newSize")
        if (newSize > currentSize) {
          for (i <- currentSize until newSize) {
            createHandler(i)
          }
        } else if (newSize < currentSize) {
          for (i <- 1 to (currentSize - newSize)) {
            runnables.remove(currentSize - i).stop()
          }
        }
        threadPoolSize.set(newSize)
      }
    
      def shutdown(): Unit = synchronized {
        info("shutting down")
        for (handler <- runnables)
          handler.initiateShutdown()
        for (handler <- runnables)
          handler.awaitShutdown()
        info("shut down completely")
      }
    }
    

    2.KafkaRequestHandler对象

    从requestHandler获取对象,然后调用KafkaApis.handle()方法处理请求,源码如下:

    /**
     * A thread that answers kafka requests.
     */
    class KafkaRequestHandler(id: Int,
                              brokerId: Int,
                              val aggregateIdleMeter: Meter,
                              val totalHandlerThreads: AtomicInteger,
                              val requestChannel: RequestChannel,
                              apis: KafkaApis,
                              time: Time) extends Runnable with Logging {
      this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "
      private val shutdownComplete = new CountDownLatch(1)
      @volatile private var stopped = false
    
      def run(): Unit = {
        while (!stopped) {
          // We use a single meter for aggregate idle percentage for the thread pool.
          // Since meter is calculated as total_recorded_value / time_window and
          // time_window is independent of the number of threads, each recorded idle
          // time should be discounted by # threads.
          val startSelectTime = time.nanoseconds
    
          //获取请求
          val req = requestChannel.receiveRequest(300)
          val endTime = time.nanoseconds
          val idleTime = endTime - startSelectTime
          aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)
    
          req match {
              //如果是ShutdownRequest,关闭
            case RequestChannel.ShutdownRequest =>
              debug(s"Kafka request handler $id on broker $brokerId received shut down command")
              shutdownComplete.countDown()
              return
    
              //正常请求
            case request: RequestChannel.Request =>
              try {
                request.requestDequeueTimeNanos = endTime
                trace(s"Kafka request handler $id on broker $brokerId handling request $request")
                //处理,里面有40多种处理类型
                apis.handle(request)
              } catch {
                case e: FatalExitError =>
                  shutdownComplete.countDown()
                  Exit.exit(e.statusCode)
                case e: Throwable => error("Exception when handling request", e)
              } finally {
                request.releaseBuffer()
              }
    
            case null => // continue
          }
        }
        shutdownComplete.countDown()
      }
    
      def stop(): Unit = {
        stopped = true
      }
    
      def initiateShutdown(): Unit = requestChannel.sendShutdownRequest()
    
      def awaitShutdown(): Unit = shutdownComplete.await()
    
    }
    

    4.总结流程

  • 相关阅读:
    atom介绍
    举例介绍重构(译)
    java单双派机制理解
    AngularJS开发指南03:HTML编译器
    AngularJS开发指南02:引导程序
    AngularJS开发指南01:AngularJS简介
    1.angular之Hello World
    31天重构学习笔记(java版本)
    一个农夫的故事 分类: 其他 2015-01-24 16:44 104人阅读 评论(0) 收藏
    一个农夫的故事 分类: 其他 2015-01-24 16:44 103人阅读 评论(0) 收藏
  • 原文地址:https://www.cnblogs.com/jordan95225/p/13407355.html
Copyright © 2011-2022 走看看