zoukankan      html  css  js  c++  java
  • kafka生产者网络层总结

    1 层次结构

    负责进行网络IO请求的是NetworkClient,主要层次结构如下
    network

    ClusterConnectionStates报存了每个节点的状态,以node为key,以node的状态为value;inFlightRequets中保存了每个节点已经发送的请求,但是还没有返回的请求,以node为key,以List<ClientRequest>为value。inFlightRequets从名字也可以看出,表示“正在空中飞”的请求。

    2 如何保证每次只发送一个请求

    sender线程启动后,如果RecordBatch中有消息,会将消息按照所在节点重新排列,每个节点会创建一个ClientRequest,用来发送,每个节点每次只能发送一个ClientRequest,如下
    KafkaChannel#setSend(..)

    public void setSend(Send send) {
            if (this.send != null) // 如果已经有send,会抛出异常
                throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
            this.send = send;
            this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
     }
    

    那么kafka是如何保证避免setSend的时候KafkaChannel中已经没有send了呢,这个关键就是在sender线程中会调用NetworkClient#ready(..),会将没有ready的节点去除掉,从而不会在该节点上setSend:

    while (iter.hasNext()) {
                Node node = iter.next();
                if (!this.client.ready(node, now)) { // 关键
                    iter.remove();
                    notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
                }
    }
    

    3 NetworkClient#ready(..)

    NetworkClient#ready(..)检查节点是否准备好,从而决定是否可以将消息封装成ClientRequest放到KafkaChannel上。

    public boolean ready(Node node, long now) {
            if (node.isEmpty())
                throw new IllegalArgumentException("Cannot connect to empty node " + node);
    
            if (isReady(node, now)) // 关键
                return true;
    
            if (connectionStates.canConnect(node.idString(), now))
                initiateConnect(node, now);
    
            return false;
        }
    

    我们来分析下isReady

    public boolean isReady(Node node, long now) {
            return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString());
        }
    

    isReady主要两个条件,一个是判断metadata是否到了更新的时候了,如果metadata需要更新,那么就不发送本次请求,也就是metadata更新优先级高。第二个是判断这个节点是否canSendRequest。

    private boolean canSendRequest(String node) {
            return connectionStates.isConnected(node) && selector.isChannelReady(node) 
            && inFlightRequests.canSendMore(node); // 重点
        }
    

    inFlightRequests保存的是“正在空中飞”的请求

    public boolean canSendMore(String node) {
            Deque<ClientRequest> queue = requests.get(node);
            return queue == null || queue.isEmpty() ||
                   (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
        }
    

    满足以下几个条件,表示可以继续send

    1. queue是空,即该节点没有“正在空中飞”的request
    2. queue不为空。queue中排在最开头的request已经completed 并且queue的大小小于允许的最大值。如何理解呢?queue是一个双端队列,每次add的时候都会在queue的头部插入,所以queue中第一个就是正在发送的,同样也是KafkaChannel中的send。只有当send发送到网络中的时候才可以继续发送。这就保证了前面说的“如何保证每次只发送一个请求”。

    4 inFlightRequests

    inFlightRequests保存了"正在空中飞"的请求,所谓“正在空中飞”的意思就是request已经发送到了网络上,但是服务端还没有回ack。NetworkClient#doSend会往inFlightRequests头部放置一个request,同时会在KafkaChannel中放置一个request.send。

    public void add(ClientRequest request) {
            Deque<ClientRequest> reqs = this.requests.get(request.request().destination());
            if (reqs == null) {
                reqs = new ArrayDeque<>();
                this.requests.put(request.request().destination(), reqs);
            }
            reqs.addFirst(request); // 重点,插入到头部
        }
    

    5 Selector#pollSelectionKeys

    Selector#pollSelectionKeys用来处理读写事件。先看写事件

    if (channel.ready() && key.isWritable()) {
                        Send send = channel.write();
                        if (send != null) {
                            this.completedSends.add(send); // 请求写完了会放到completedSends中
                            this.sensors.recordBytesSent(channel.id(), send.size());
                        }
                    }
    

    往网络中写的时候,会调用KafkaChannel#write来写。

    public Send write() throws IOException {
            Send result = null;
            if (send != null && send(send)) {
                result = send;
                send = null; // kafkaChannel中的send被置为null,这时候新的request可以发送了
            }
            return result;
        }
    
    private boolean send(Send send) throws IOException {
            send.writeTo(transportLayer);
            if (send.completed())
                transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
    
            return send.completed();
        }
    

    发送可能一次不能完全发送完毕,需要发送好几次才能将request全部发送到网络上,只有这个request发送完毕了才能将KafkaChannel中的send置为null,新的request才可以setSend。但此时inFlightRequests并没有移除该request,也就是说该request还在"飞",但是新的request可以添加。发送完毕后会将channel的SelectionKey.OP_WRITE移除,没有发送完毕不会移除,下次轮询的时候该节点没有ready,不会添加新的request,会继续发送没有发完的request。

    对于ack=0,不要求服务端ack就表示发送成功。该处理是在NetworkClient#handleCompletedSends(..)中进行的

    private void handleCompletedSends(List<ClientResponse> responses, long now) {
            // if no response is expected then when the send is completed, return it
            for (Send send : this.selector.completedSends()) {
                ClientRequest request = this.inFlightRequests.lastSent(send.destination());
                if (!request.expectResponse()) { // ack = 0不需要服务端response
                    this.inFlightRequests.completeLastSent(send.destination()); // request从inFlightRequests中移除,表示此次请求完毕
                    responses.add(new ClientResponse(request, now, false, null));
                }
            }
        }
    

    对于ack !=0 ,则要求服务端ack才表示发送成功,该处理是在
    NetworkClient#handleCompletedReceives(..)中进行

    private void handleCompletedReceives(List<ClientResponse> responses, long now) {
            for (NetworkReceive receive : this.selector.completedReceives()) {
                String source = receive.source();
                ClientRequest req = inFlightRequests.completeNext(source);
                Struct body = parseResponse(receive.payload(), req.request().header());
                if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
                    responses.add(new ClientResponse(req, now, false, body));
            }
        }
    

    5 参考

    https://blog.csdn.net/chunlongyu/article/details/52651960

  • 相关阅读:
    Redis学习笔记
    Springboot + Tomcat跑项目出现端口被占用的问题
    按层打印二叉树
    打印二叉树的镜像——剑指offer
    判断树的子结构——剑指offer
    实习半个月的感想
    使用KMP算法判断是否为旋转词
    微信双开
    win10 右键添加cmd当前目录打开
    勒索邮件
  • 原文地址:https://www.cnblogs.com/set-cookie/p/8974963.html
Copyright © 2011-2022 走看看