zoukankan      html  css  js  c++  java
  • Kafka 0.8 NIO通信机制

    一、Kafka通信机制的整体结构

    image
    同时,这也是SEDA多线程模型。

    1. 对于broker来说,客户端连接数量有限,不会频繁新建大量连接。因此一个Acceptor thread线程处理新建连接绰绰有余。
    2. Kafka高吐吞量,则要求broker接收和发送数据必须快速,因此用proccssor thread线程池处理,并把读取客户端数据转交给缓冲区,不会导致客户端请求大量堆积。
    3. Kafka磁盘操作比较频繁会且有io阻塞或等待,IO Thread线程数量一般设置为proccssor thread num两倍,可以根据运行环境需要进行调节。

    二、SocketServer整体设计时序图

    image

    Kafka SocketServer是基于Java NIO来开发的,采用了Reactor的模式,其中包含了1个Acceptor负责接受客户端请求,N个Processor线程负责读写数据,M个Handler来处理业务逻辑。在Acceptor和Processor,Processor和Handler之间都有队列来缓冲请求。

    下面我们就针对以上整体设计思路分开讲解各个不同部分的源代码。

    2.1 启动初始化工作

    /**
       * Start the socket server
       */
      def startup() {
        this.synchronized {
    
          connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
    
          val sendBufferSize = config.socketSendBufferBytes
          val recvBufferSize = config.socketReceiveBufferBytes
          val brokerId = config.brokerId
    
          var processorBeginIndex = 0
          endpoints.values.foreach { endpoint =>
            val protocol = endpoint.protocolType
            val processorEndIndex = processorBeginIndex + numProcessorThreads
    
            for (i <- processorBeginIndex until processorEndIndex)
              processors(i) = newProcessor(i, connectionQuotas, protocol)
    
            val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
              processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
            acceptors.put(endpoint, acceptor)
            Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()
            acceptor.awaitStartup()
    
            processorBeginIndex = processorEndIndex
          }
        }
    
        newGauge("NetworkProcessorAvgIdlePercent",
          new Gauge[Double] {
            def value = allMetricNames.map( metricName =>
              metrics.metrics().get(metricName).value()).sum / totalProcessorThreads
          }
        )
    
        info("Started " + acceptors.size + " acceptor threads")
      }
    
    • ConnectionQuotas对象负责管理连接数/IP,
    • 创建一个Acceptor侦听者线程,初始化N个Processor线程,processors是一个线程数组,可以作为线程池使用,默认是三个.
    • Acceptor线程和N个Processor线程中每个线程都独立创建Selector.open()多路复用器.
    val numProcessorThreads = config.numNetworkThreads //num.network.threads=3 配置的是16
    val serverChannel = openServerSocket(endPoint.host, endPoint.port)
    

    2.2 Acceptor线程

    /**
       * Accept loop that checks for new connection attempts
       */
      def run() {
        serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) // 2.1.1 注册OP_ACCEPT事件
        startupComplete()
        try {
          var currentProcessor = 0
          while (isRunning) {
            try {
              val ready = nioSelector.select(500) // 2.1.2 采用的是同步非阻塞逻辑,每隔500MS轮询一次
              if (ready > 0) {
                val keys = nioSelector.selectedKeys()
                val iter = keys.iterator()
                while (iter.hasNext && isRunning) {
                  try {
                    val key = iter.next
                    iter.remove()
                    if (key.isAcceptable)
                      accept(key, processors(currentProcessor)) // 2.1.2 将代码添加到newConnections队列之后返回
                    else
                      throw new IllegalStateException("Unrecognized key state for acceptor thread.")
    
                    // 2.1.2 当有请求到来的时候采用轮询的方式获取一个Processor线程处理请求
                    currentProcessor = (currentProcessor + 1) % processors.length 
                  } catch {
                    case e: Throwable => error("Error while accepting connection", e)
                  }
                }
              }
            }
            catch {
              // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
              // to a select operation on a specific channel or a bad request. We don't want the
              // the broker to stop responding to requests from other clients in these scenarios.
              case e: ControlThrowable => throw e
              case e: Throwable => error("Error occurred", e)
            }
          }
        } finally {
          debug("Closing server socket and selector.")
          swallowError(serverChannel.close())
          swallowError(nioSelector.close())
          shutdownComplete()
        }
      }
    

    2.1.1 注册OP_ACCEPT事件

    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)

    2.1.2 内部逻辑

    此处采用的是同步非阻塞逻辑,每隔500MS轮询一次.

    当有请求到来的时候采用轮询的方式获取一个Processor线程处理请求,代码如下:

    之后将代码添加到newConnections队列之后返回,代码如下:

    def accept(socketChannel: SocketChannel) {  newConnections.add(socketChannel)  wakeup()}
    
    //newConnections是一个线程安全的队列,存放SocketChannel通道
    private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
    

    2.3 kafka.net.Processor

    override def run() {
        startupComplete()
        while (isRunning) {
          try {
            // setup any new connections that have been queued up
            configureNewConnections()
            // register any new responses for writing
            processNewResponses()
            poll()
            processCompletedReceives()
            processCompletedSends()
            processDisconnected()
          } catch {
            // We catch all the throwables here to prevent the processor thread from exiting. We do this because
            // letting a processor exit might cause a bigger impact on the broker. Usually the exceptions thrown would
            // be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel
            // or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.
            case e: ControlThrowable => throw e
            case e: Throwable =>
              error("Processor got uncaught exception.", e)
          }
        }
    
        debug("Closing selector - processor " + id)
        swallowError(closeAll())
        shutdownComplete()
      }
    

    先来重点看一下configureNewConnections这个方法:

    /**
       * Register any new connections that have been queued up
       */
      private def configureNewConnections() {
        while (!newConnections.isEmpty) {
          val channel = newConnections.poll()
          debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
          selector.register(connectionId, channel)
        }
      }
    

    循环判断NewConnections的大小,如果有值则弹出,并且注册为OP_READ读事件。
    再回到主逻辑看一下read方法。(这个方法估计在0.10的代码中被废弃了,没有找到。)

    1. 把当前SelectionKey和事件循环时间放入LRU映射表中,将来检查时回收连接资源。
    2. 建立BoundedByteBufferReceive对象,具体读取操作由这个对象的readFrom方法负责进行,返回读取的字节大小。
    • 如果读取完成,则修改状态为receive.complete,并通过requestChannel.sendRequest(req)将封装好的Request对象放到RequestQueue队列中。
    • 如果没有读取完成,则让selector继续侦听OP_READ事件。

    2.4 kafka.server.KafkaRequestHandler

    /**
     * A thread that answers kafka requests.
     */
    def run() {
        while(true) {
            var req : RequestChannel.Request = null
            while (req == null) {
              // We use a single meter for aggregate idle percentage for the thread pool.
              // Since meter is calculated as total_recorded_value / time_window and
              // time_window is independent of the number of threads, each recorded idle
              // time should be discounted by # threads.
              val startSelectTime = SystemTime.nanoseconds
              req = requestChannel.receiveRequest(300)
              val idleTime = SystemTime.nanoseconds - startSelectTime
              aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
            }
    
            if(req eq RequestChannel.AllDone) {
              debug("Kafka request handler %d on broker %d received shut down command".format(
                id, brokerId))
              return
            }
            req.requestDequeueTimeMs = SystemTime.milliseconds
            trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
            apis.handle(req)
        }
      } 
    

    KafkaRequestHandler也是一个事件处理线程,不断的循环读取requestQueue队列中的Request请求数据,其中超时时间设置为300MS,并将请求发送到 apis.handle方法中处理,并将请求响应结果放到responseQueue队列中去。

    参数 说明 对应方法
    RequestKeys.ProduceKey producer请求 ProducerRequest
    RequestKeys.FetchKey consumer请求 FetchRequest
    RequestKeys.OffsetsKey topic的offset请求 OffsetRequest
    RequestKeys.MetadataKey topic元数据请求 TopicMetadataRequest
    RequestKeys.LeaderAndIsrKey leader和isr信息更新请求 LeaderAndIsrRequest
    RequestKeys.StopReplicaKey 停止replica请求 StopReplicaRequest
    RequestKeys.UpdateMetadataKey 更新元数据请求 UpdateMetadataRequest
    RequestKeys.ControlledShutdownKey controlledShutdown请求 ControlledShutdownRequest
    RequestKeys.OffsetCommitKey commitOffset请求 OffsetCommitRequest
    RequestKeys.OffsetFetchKey consumer的offset请求 OffsetFetchRequest

    2.5 Processor响应数据处理

    def processNewResponses() {
        var curr = requestChannel.receiveResponse(id)
        while (curr != null) {
          try {
            curr.responseAction match {
              case RequestChannel.NoOpAction =>
                // There is no response to send to the client, we need to read more pipelined requests
                // that are sitting in the server's socket buffer
                curr.request.updateRequestMetrics
                trace("Socket server received empty response to send, registering for read: " + curr)
                selector.unmute(curr.request.connectionId)
              case RequestChannel.SendAction =>
                sendResponse(curr)
              case RequestChannel.CloseConnectionAction =>
                curr.request.updateRequestMetrics
                trace("Closing socket connection actively according to the response code.")
                close(selector, curr.request.connectionId)
            }
          } finally {
            curr = requestChannel.receiveResponse(id)
          }
        }
      }
    

    我们回到Processor线程类中,processNewRequest()方法是发送请求,那么会调用processNewResponses()来处理Handler提供给客户端的Response,把requestChannel中responseQueue的Response取出来,注册OP_WRITE事件,将数据返回给客户端。

  • 相关阅读:
    晕晕的一天
    23. 合并K个排序链表
    25. K 个一组翻转链表
    328. 奇偶链表
    86. 分隔链表
    290. 单词规律
    202. 快乐数
    242. 有效的字母异位词
    16.最接近的三数之和
    (转) java 简单工厂模式(实现一个计算器)
  • 原文地址:https://www.cnblogs.com/byrhuangqiang/p/6368010.html
Copyright © 2011-2022 走看看