zoukankan      html  css  js  c++  java
  • kafka.network.SocketServer分析

    当Kafka启动时,会启动这个SocketServer来接收客户端的连接,处理客户端请求,发送响应。

    这个类的注释说明了这个socket server的结构

    /**
    * An NIO socket server. The threading model is
    * 1 Acceptor thread that handles new connections
    * N Processor threads that each have their own selector and read requests from sockets
    * M Handler threads that handle requests and produce responses back to the processor threads for writing.
    */

    1个Acceptor,用来接收新的连接。
    N个Processor,每个Processor有自己的selector,Processor从sockets里读取请求,以及写response到sockets。
    M个Handler用来处理请求,并且产生response给Processor。

    其中:

    Acceptor监听新连接,如果有新连接,就分配给某个Processor,这个Processor会把这个SocketChannel注册给自己的Selector,注册为OP_READ。当这个SocketChannel可读,就从中读数据,产生Request,然后放入到RequestChannel的队列中。
    Processor还会从新RequestChannel中不断取Response,然后把Response对应的SocketChannel在自己的Selector上注册为OP_WRITE,当这个SocketChannel可写,就把数据写入。然后把这个SocketChannel注册为OP_READ,继续监听请求。

    其中使用的类主要包括:

    Acceptor :  它是一个SocketServer, 接受新的连接,并且分配连接给Processor

    Processor: 读取请求,发送响应

    Handler: 处理请求,产生响应。这里的Handler由kafka.server.RequestHandler实现。

    RequestChannel: 它包括了一个request queue 和 一个 response queue. 是Handler和Processsor交互时使用的队列。Request由Processor放入RequestChannel, 由Handler取出,然后把Response放回RequestChannel.

    Acceptor在接受连接后,就把相当的SocketChannel设成非阻塞模式。因此Processor对这些SocketChannel的读写都是使用Selector,采用非阻塞的处理模式。

    问题:

     (1) Acceptor是如何把新来的连接分配给对应的Processor,这个算法是什么?是round robin吗?

              在每接收一个请求后,调用
                   // round robin to the next processor thread
                  currentProcessor = (currentProcessor + 1 ) % processors .length
              而每个新的socketChannel分配的方式为:
              accept(key, processors(currentProcessor ))
              这个key就是Acceptor的Selector返回的SelectionKey
              因此,socketChannel分配给Processor的过程是round robin的
     

     (2) Processor应该把对应的SocketChannel在自己的Selector上如何注册?

              
              首先,Acceptor会把这个SocketChannel传送给对应的Processor:
              在收到一个新的连接时,Acceptor对它调用自己的accept方法
              accept(key: SelectionKey, processor: Processor)
     
              为了使用Selector,它会将新到的SocketChannel配置为非阻塞模式,然后配置sendBufferSize
              然后调用Processor的accept方法。
              然后,Processor会把这个SocketChannel加入到自己的newConnection队列中。
              在每个Processor内部有一个ConcurrentLinkedQueue
              private val newConnections = new ConcurrentLinkedQueue [SocketChannel]()
              然后Processor会处理这个新连接。
              Processor的accept方法实现为:
                  newConnections.add(socketChannel)
               wakeup()
              即,将新的socketChannel加到队列中,然后wakeup自己的selector。
              这会使得select从阻塞状态醒来,执行一次select()外层的while循环。在每次循环的开始,都会处理新的connection。
              configureNewConnections()
              这个方法的实现为:
             while( newConnections.size() > 0 ) {
               val channel = newConnections.poll()
               channel.register( selector, SelectionKey. OP_READ)
           }
              这个socketChannel被注册为OP_READ
              于是,当这个连接有请求过来,Processor的Selector就会从select方法中返回,Processor开始读取请求。
         

    (3) Processor如何读取请求?

              首先,如果一个SocketChannel可读。Processor在自己run方法的while循环中会从select方法中获得对应的SelectionKey。
              在Processor的run方法的while循环中:
                     if(key .isReadable)
                  read( key)
              
              read方法会从SocketChannel中读取并构造Request对象,然后把它发送给RequestChannel。
        它的实现为:
        
      /*
       * Process reads from ready sockets
       */
      def read(key: SelectionKey) {
        val socketChannel = channelFor(key) //获取可读的SocketChannel
        var receive = key.attachment.asInstanceOf[Receive] //获取attach到SelectionKey的Receive对象
        if(key.attachment == null) {  //如果attachment是空,说明这是第一次读,就新建一个Receive对象,attach到这个SocketChannel的SelectionKey上。如果不是空,说明之前已经从中读了一些数据,只是没读完。
          receive = new BoundedByteBufferReceive(maxRequestSize)
          key.attach(receive)
        }
        val read = receive.readFrom(socketChannel) //从SocketChannel中读数据
        val address = socketChannel.socket.getRemoteSocketAddress();
        trace(read + " bytes read from " + address)
        if(read < 0) { //如果读的数据数小于0,就关闭socket连接。实际上从BoundedByteBufferReceive的实现来看,read的值不会小于0
          close(key)
        } else if(receive.complete) {//如果读完了,就构造request,发送给requestChannel
          val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)
          requestChannel.sendRequest(req)
          key.attach(null) //取消attach的Receive对象
          // explicitly reset interest ops to not READ, no need to wake up the selector just yet
          key.interestOps(key.interestOps & (~SelectionKey.OP_READ))//显示地把这个SocketChannel设为非OP_READ,等到Response发给这个SocketChannel以后,它会被再设为OP_READ,以继续处理来自这个SocketChannel的请求。
        } else {//如果没有读完,就把这个SocketChannel注册为OP_READ,然后wakeup对应的selector,继续从SocketChannel中读数据。所以下一次再处理这个SocketChannel时,attach到它的SelectionKey的Receive对象就不是空了。
          // more reading to be done
          trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress())
          key.interestOps(SelectionKey.OP_READ)
          wakeup()
        }
      }
    

      那么BoundedByteBufferReceive是如何知道一个请求读没读完呢?

      原来每个Request的前4个字节标识了这个Request有多长,BoundedByteBufferReceive从SocketChannel中读取前4个字节,转换成整形,以这个整数为大小构造一个ByteBuffer,如果这个ByteBuffer没有写满,就说明请求的内容还没有读完。receive.complete就不被设为true,否则就说明这个Request已经从channel中完全读出。

          if(!contentBuffer.hasRemaining) {
            contentBuffer.rewind()
            complete = true
          }
     在Kafka的Wire Format中有说明:
    Request Header (all single non-multi requests begin with this)
    0                   1                   2                   3
     0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
    |                       REQUEST_LENGTH                          |
    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
    |         REQUEST_TYPE          |        TOPIC_LENGTH           |
    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
    /                                                               /
    /                    TOPIC (variable length)                    /
    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
    |                           PARTITION                           |
    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

     (4) Processor如何接收Handler产生的response?

              它会在run方法的while循环中获取RequestChannel中的Response,然后把它写到SocketChannel。
              中间的机制和Request类似。
  • 相关阅读:
    L161
    L160
    L159
    PyQt编程实战:画出QScrollArea的scrollAreaWidgetContents内容部署层的范围矩形
    PyQt(Python+Qt)学习随笔:QScrollArea滚动区域layout布局的作用及设置方法
    PyQt(Python+Qt)学习随笔:QScrollArea滚动区域的scrollAreaWidgetContents、widget及setWidget等相关概念解释
    PyQt(Python+Qt)学习随笔:怎么在QScrollArea滚动区域中展示子部件的超长内容?
    PyQt(Python+Qt)学习随笔:QScrollArea的widgetResizable属性
    PyQt(Python+Qt)学习随笔:QScrollArea的alignment属性不起作用的原因
    第二十四章、containers容器类部件QScrollArea滚动区域详解
  • 原文地址:https://www.cnblogs.com/devos/p/3780689.html
Copyright © 2011-2022 走看看