zoukankan      html  css  js  c++  java
  • Kafka网络模型和通信流程剖析

    1.概述

    最近有同学在学习Kafka的网络通信这块内容时遇到一些疑问,关于网络模型和通信流程的相关内容,这里笔者将通过这篇博客为大家来剖析一下这部分内容。

    2.内容

    Kafka系统作为一个Message Queue,涉及到的网络通信主要包含以下两个方面:

    • Pull:Consumer从消息队列中拉取消息数据;
    • Push:Producer往消息队列中推送消息数据。

    要实现高性能的网络通信,可以使用更加底层的TCP协议或者UDP协议来实现。Kafka在Producer、Broker、Consumer之间设计了一套基于TCP层的通信协议,这套协议完全是为了Kafka系统自身需求而定制实现的。

    提示:
    这里需要注意的是,由于UDP协议是一种不可靠的传输协议,所以Kafka系统采用TCP协议作为服务间的通信协议。

    2.1 基本数据类型

    通信协议中的基本数据类型分为以下几种:

    • 定长数据类型:例如,int8、int16、int32和、int64,对应到Java语言中,分别是byte、short、int和long
    • 可变数据类型:例如,Java语言中Map、List等
    • 数组:例如,Java语言中的int[]、String[]等

    2.2 通信模型

    Kafka系统采用的是Reactor多线程模型,即通过一个Acceptor线程处理所有的新连接,通过多个Processor线程对请求进行处理(比如解析协议、封装请求、、转发等)。

    提示:
    Reactor是一种事件模型,可以将请求提交到一个或者多个服务程序中进行处理。
    当收到Client的请求后,Server处理程序使用多路分发策略,由一个非阻塞的线程来接收所有的请求,然后将这些请求转发到对应的工作线程中进行处理。

    之后,在Kafka的版本迭代中,新增了一个Handler模块,它通过指定的线程数对请求进行处理。Handler和Processor之间通过一个Block Queue进行连接。如下图所示:

    这里 Acceptor是一个继承于AbstractServerThread的线程类,Acceptor的主要目的是监听并且接收Client的请求,同时,建立数据传输通道(SocketChannel),然后通过轮询的方式交给一个Processor处理。其核心代码在Acceptor的run方法中,代码如下:

    def run() {
        serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
        startupComplete()
        try {
          var currentProcessor = 0
          while (isRunning) {
            try {
              val ready = nioSelector.select(500)
              if (ready > 0) {
                val keys = nioSelector.selectedKeys()
                val iter = keys.iterator()
                while (iter.hasNext && isRunning) {
                  try {
                    val key = iter.next
                    iter.remove()
                    if (key.isAcceptable)
                      accept(key, processors(currentProcessor))
                    else
                      throw new IllegalStateException("Unrecognized key state for acceptor thread.")
    
                    // round robin to the next processor thread
                    currentProcessor = (currentProcessor + 1) % processors.length
                  } catch {
                    case e: Throwable => error("Error while accepting connection", e)
                  }
                }
              }
            }
            catch {
              // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
              // to a select operation on a specific channel or a bad request. We don't want
              // the broker to stop responding to requests from other clients in these scenarios.
              case e: ControlThrowable => throw e
              case e: Throwable => error("Error occurred", e)
            }
          }
        } finally {
          debug("Closing server socket and selector.")
          swallowError(serverChannel.close())
          swallowError(nioSelector.close())
          shutdownComplete()
        }
      }

    这里还有一个块通道(BlockingChannel),用于连接Processor和Handler,其代码如下所示:

    class BlockingChannel( val host: String, 
                           val port: Int, 
                           val readBufferSize: Int, 
                           val writeBufferSize: Int, 
                           val readTimeoutMs: Int ) extends Logging {
      private var connected = false
      private var channel: SocketChannel = null
      private var readChannel: ReadableByteChannel = null
      private var writeChannel: GatheringByteChannel = null
      private val lock = new Object()
      private val connectTimeoutMs = readTimeoutMs
      private var connectionId: String = ""
    
      def connect() = lock synchronized  {
        if(!connected) {
          try {
            channel = SocketChannel.open()
            if(readBufferSize > 0)
              channel.socket.setReceiveBufferSize(readBufferSize)
            if(writeBufferSize > 0)
              channel.socket.setSendBufferSize(writeBufferSize)
            channel.configureBlocking(true)
            channel.socket.setSoTimeout(readTimeoutMs)
            channel.socket.setKeepAlive(true)
            channel.socket.setTcpNoDelay(true)
            channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)
    
            writeChannel = channel
            // Need to create a new ReadableByteChannel from input stream because SocketChannel doesn't implement read with timeout
            // See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work
            readChannel = Channels.newChannel(channel.socket().getInputStream)
            connected = true
            val localHost = channel.socket.getLocalAddress.getHostAddress
            val localPort = channel.socket.getLocalPort
            val remoteHost = channel.socket.getInetAddress.getHostAddress
            val remotePort = channel.socket.getPort
            connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort
            // settings may not match what we requested above
            val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d), connectTimeoutMs = %d."
            debug(msg.format(channel.socket.getSoTimeout,
                             readTimeoutMs,
                             channel.socket.getReceiveBufferSize, 
                             readBufferSize,
                             channel.socket.getSendBufferSize,
                             writeBufferSize,
                             connectTimeoutMs))
    
          } catch {
            case _: Throwable => disconnect()
          }
        }
      }
      
      def disconnect() = lock synchronized {
        if(channel != null) {
          swallow(channel.close())
          swallow(channel.socket.close())
          channel = null
          writeChannel = null
        }
        // closing the main socket channel *should* close the read channel
        // but let's do it to be sure.
        if(readChannel != null) {
          swallow(readChannel.close())
          readChannel = null
        }
        connected = false
      }
    
      def isConnected = connected
    
      def send(request: RequestOrResponse): Long = {
        if(!connected)
          throw new ClosedChannelException()
    
        val send = new RequestOrResponseSend(connectionId, request)
        send.writeCompletely(writeChannel)
      }
      
      def receive(): NetworkReceive = {
        if(!connected)
          throw new ClosedChannelException()
    
        val response = readCompletely(readChannel)
        response.payload().rewind()
    
        response
      }
    
      private def readCompletely(channel: ReadableByteChannel): NetworkReceive = {
        val response = new NetworkReceive
        while (!response.complete())
          response.readFromReadableChannel(channel)
        response
      }
    
    }

    3.通信过程

    Kafka系统的通信框架也是经过了不同的版本迭代的。例如,在Kafka老的版本中,以NIO作为网络通信的基础,通过将多个Socket连接注册到一个Selector上进行监听,只用一个线程就能管理多个连接,这极大的节省了多线程的资源开销。

    在Kafka之后的新版本中,依然以NIO作为网络通信的基础,也使用了Reactor多线程模型,不同的是,新版本将具体的业务处理模块(Handler模块)独立出去了,并用单独的线程池进行控制。如下图所示:

     通过上图,我们可以总结一下Kafka的通信流程:

    • Client向Server发送请求时,Acceptor负责接收TCP请求,连接成功后传递给Processor线程;
    • Processor线程接收到新的连接后,将其注册到自身的Selector中,并监听READ事件
    • 当Client在当前连接对象上写入数据时,会触发READ事件,根据TCP协议调用Handler进行处理
    • Handler处理完成后,可能会有返回值给Client,并将Handler返回的结果绑定Response端进行发送

    通过总结和分析,我们可以知道Kafka新版中独立Handler模块,用这样以下几点优势:

    • 能够单独指定Handler的线程数,便于调优和管理
    • 防止一个过大的请求阻塞一个Processor线程
    • Request、Handler、Response之间都是通过队列来进行连接的,这样它们彼此之间不存在耦合现象,对提升Kafka系统的性能很有帮助

    这里需要注意的是,在Kafka的网络通信中,RequestChannel为Processor线程与Handler线程之间数据交换提供了一个缓冲区,是通信中Request和Response缓存的地方。因此,其作用就是在通信中起到了一个数据缓冲队列的作用。Processor线程将读取到的请求添加至RequestChannel的全局队列(requestQueue)中,Handler线程从请求队列中获取并处理,处理完成后将Response添加至RequestChannel的响应队列(responseQueues)中,通过responseListeners唤醒对应的Processor线程,最后Processor线程从响应队列中取出后发送到Client。实现代码如下:

    class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
      private var responseListeners: List[(Int) => Unit] = Nil
      private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
      private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
      for(i <- 0 until numProcessors)
        responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
    
      newGauge(
        "RequestQueueSize",
        new Gauge[Int] {
          def value = requestQueue.size
        }
      )
    
      newGauge("ResponseQueueSize", new Gauge[Int]{
        def value = responseQueues.foldLeft(0) {(total, q) => total + q.size()}
      })
    
      for (i <- 0 until numProcessors) {
        newGauge("ResponseQueueSize",
          new Gauge[Int] {
            def value = responseQueues(i).size()
          },
          Map("processor" -> i.toString)
        )
      }
    
      /** Send a request to be handled, potentially blocking until there is room in the queue for the request */
      def sendRequest(request: RequestChannel.Request) {
        requestQueue.put(request)
      }
    
      /** Send a response back to the socket server to be sent over the network */
      def sendResponse(response: RequestChannel.Response) {
        responseQueues(response.processor).put(response)
        for(onResponse <- responseListeners)
          onResponse(response.processor)
      }
    
      /** No operation to take for the request, need to read more over the network */
      def noOperation(processor: Int, request: RequestChannel.Request) {
        responseQueues(processor).put(RequestChannel.Response(processor, request, null, RequestChannel.NoOpAction))
        for(onResponse <- responseListeners)
          onResponse(processor)
      }
    
      /** Close the connection for the request */
      def closeConnection(processor: Int, request: RequestChannel.Request) {
        responseQueues(processor).put(RequestChannel.Response(processor, request, null, RequestChannel.CloseConnectionAction))
        for(onResponse <- responseListeners)
          onResponse(processor)
      }
    
      /** Get the next request or block until specified time has elapsed */
      def receiveRequest(timeout: Long): RequestChannel.Request =
        requestQueue.poll(timeout, TimeUnit.MILLISECONDS)
    
      /** Get the next request or block until there is one */
      def receiveRequest(): RequestChannel.Request =
        requestQueue.take()
    
      /** Get a response for the given processor if there is one */
      def receiveResponse(processor: Int): RequestChannel.Response = {
        val response = responseQueues(processor).poll()
        if (response != null)
          response.request.responseDequeueTimeMs = Time.SYSTEM.milliseconds
        response
      }
    
      def addResponseListener(onResponse: Int => Unit) {
        responseListeners ::= onResponse
      }
    
      def shutdown() {
        requestQueue.clear()
      }
    }

    4.总结

    通过认真阅读和分析Kafka的网络通信层代码,可以收获不少关于NIO的网络通信知识。通过对Kafka的源代码进行阅读和学习,这对大规模Kafka集群性能的调优和问题定位排查是很有帮助的。

    5.结束语

    这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

    另外,博主出书了《Kafka并不难学》和《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同学, 可以在公告栏那里点击购买链接购买博主的书进行学习,在此感谢大家的支持。关注下面公众号,根据提示,可免费获取书籍的教学视频。 

  • 相关阅读:
    django orm中filter(条件1).filter(条件2)与filter(条件1,条件2)的区别 哈欠涟涟的日志 网易博客
    提示:ERROR 1044 (42000): Access denied for user
    取消选中的区域
    1.XML复习、HTML DOM 复习 2.simpleXML方式解析XML 3.XPath 4,MVC
    JSP的内置对象(session)
    每月自评之三:2013年3月
    HDU1405:The Last Practice
    一种Playfair密码变种加密方法如下:首先选择一个密钥单词(称为pair)(字母不重复,且都为小写字母),然后与字母表中其他字母一起填入至一个5x5的方阵中,填入方法如下:
    一个经典实用的 IPtables Shell 脚本
    static class 静态类
  • 原文地址:https://www.cnblogs.com/smartloli/p/12287130.html
Copyright © 2011-2022 走看看