zoukankan      html  css  js  c++  java
  • Kafka 0.10 Metadata的补充

    什么是Metadata? Topic/Partion与broker的映射关系:每一个Topic的每一个Partion的Leader、Follower的信息。

    它存在哪里?持久化在Zookeeper中;运行时存储在Broker的内存中。

    1 Metadata的2种更新机制

    1. 周期性的更新: 每隔一段时间更新一次。,这个通过 Metadata的lastRefreshMs, lastSuccessfulRefreshMs 这2个字段来实现。对应的ProducerConfig配置项为:

      • metadata.max.age.ms //缺省300000,即10分钟1次
    2. 失效检测,强制更新:检查到metadata失效以后,调用metadata.requestUpdate()强制更新。 requestUpdate()函数里面其实什么都没做,就是把needUpdate置成了false

    每次Sender.poll的时候,都检查这2种更新机制,达到了,就触发更新。

    那如何判定Metadata失效了呢?这个在代码中很分散,有很多地方,会判定Metadata失效。

    2. Metadata失效检测

    条件1:initConnect的时候 - NetworkClient.java

    private void initiateConnect(Node node, long now) {
            String nodeConnectionId = node.idString();
            try {
                log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
                this.connectionStates.connecting(nodeConnectionId, now);
                selector.connect(nodeConnectionId,
                                 new InetSocketAddress(node.host(), node.port()),
                                 this.socketSendBuffer,
                                 this.socketReceiveBuffer);
            } catch (IOException e) {
                /* attempt failed, we'll try again after the backoff */
                connectionStates.disconnected(nodeConnectionId, now);
                /* maybe the problem is our metadata, update it */
                metadataUpdater.requestUpdate();  //判定metadata失效
                log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
            }
        }
    

    条件2:poll里面IO的时候,连接断掉了 - NetworkClient.java

    private void handleDisconnections(List<ClientResponse> responses, long now) {
            for (String node : this.selector.disconnected()) {
                log.debug("Node {} disconnected.", node);
                processDisconnection(responses, node, now);
            }
            if (this.selector.disconnected().size() > 0)
                metadataUpdater.requestUpdate();  //判定metadata失效
        }
    

    条件3:有请求超时 - NetworkClient.java

    private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
            List<String> nodeIds = this.inFlightRequests.getNodesWithTimedOutRequests(now, this.requestTimeoutMs);
            for (String nodeId : nodeIds) {
                this.selector.close(nodeId);
                log.debug("Disconnecting from node {} due to request timeout.", nodeId);
                processDisconnection(responses, nodeId, now);
            }
    
            if (nodeIds.size() > 0)
                metadataUpdater.requestUpdate();  //判定metadata失效
        }
    

    条件4:发消息的时候,有partition的leader没找到 - Sender.java

    public void run(long now) {
            Cluster cluster = metadata.fetch();
            RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
    
            if (result.unknownLeadersExist)
                this.metadata.requestUpdate();
    

    条件5:返回的response和请求对不上的时候

    private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) {
            int correlationId = response.request().request().header().correlationId();
            if (response.wasDisconnected()) {
                log.trace("Cancelled request {} due to node {} being disconnected", response, response.request()
                                                                                                      .request()
                                                                                                      .destination());
                for (RecordBatch batch : batches.values())
                    completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlationId, now);
    

    总之:发生各式各样的异常,数据不同步,都认为metadata可能出问题了,要求更新。

    3.Metadata更新特点

    Metadata的更新,还有以下几个特点:

    1. 更新请求MetadataRequest是nio异步发送的,在Sender.poll的返回中,处理MetadataResponse的时候,才真正更新Metadata。

    这里有个关键点:Metadata的cluster对象,每次是整个覆盖的,而不是局部更新。所以cluster内部不用加锁。

    1. 更新的时候,是从metadata保存的所有Node,或者说Broker中,选负载最小的那个,也就是当前接收请求最少的那个。向其发送MetadataRequest请求,获取新的Cluster对象。

    4 Sender poll()更新Metadata

    从下面可以看出,Metadata的更新,是在while循环,每次调用client.poll()的时候更新的。

    Sender 是KafkaProducer的一个线程类。

    public void run() {
            // main loop, runs until close is called
            while (running) {
                try {
                    run(time.milliseconds());
                } catch (Exception e) {
                    log.error("Uncaught error in kafka producer I/O thread: ", e);
                }
            }
           。。。
        }
    
        public void run(long now) {
            Cluster cluster = metadata.fetch();
    。。。
            RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);   //遍历消息队列中所有的消息,找出对应的,已经ready的Node
    
            if (result.unknownLeadersExist)  //如果一个ready的node都没有,请求更新metadata
                this.metadata.requestUpdate();
    
      。。。
    
         //client的2个关键函数,一个发送ClientRequest,一个接收ClientResponse。底层调用的是NIO的poll。关于nio, 后面会详细介绍
            for (ClientRequest request : requests)
                client.send(request, now);
    
            this.client.poll(pollTimeout, now);
        }
    
    //NetworkClient
        public List<ClientResponse> poll(long timeout, long now) {
            long metadataTimeout = metadataUpdater.maybeUpdate(now); //关键点:每次poll的时候判断是否要更新metadata
    
            try {
                this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
            } catch (IOException e) {
                log.error("Unexpected error during I/O", e);
            }
    
            // process completed actions
            long updatedNow = this.time.milliseconds();
            List<ClientResponse> responses = new ArrayList<>();
            handleCompletedSends(responses, updatedNow);
            handleCompletedReceives(responses, updatedNow);   //在返回的handler中,会处理metadata的更新
            handleDisconnections(responses, updatedNow);
            handleConnections();
            handleTimedOutRequests(responses, updatedNow);
    
            // invoke callbacks
            for (ClientResponse response : responses) {
                if (response.request().hasCallback()) {
                    try {
                        response.request().callback().onComplete(response);
                    } catch (Exception e) {
                        log.error("Uncaught error in request completion:", e);
                    }
                }
            }
    
            return responses;
        }
    
     //DefaultMetadataUpdater
             @Override
            public long maybeUpdate(long now) {
                // should we update our metadata?
                long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
                long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
                long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
                // if there is no node available to connect, back off refreshing metadata
                long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
                        waitForMetadataFetch);
    
                if (metadataTimeout == 0) {
                    // highly dependent on the behavior of leastLoadedNode.
                    Node node = leastLoadedNode(now);  //找到负载最小的Node
                    maybeUpdate(now, node); //把更新Metadata的请求,发给这个Node
                }
    
                return metadataTimeout;
            }
    
            private void maybeUpdate(long now, Node node) {
                if (node == null) {
                    log.debug("Give up sending metadata request since no node is available");
                    // mark the timestamp for no node available to connect
                    this.lastNoNodeAvailableMs = now;
                    return;
                }
                String nodeConnectionId = node.idString();
    
                if (canSendRequest(nodeConnectionId)) {
                    Set<String> topics = metadata.needMetadataForAllTopics() ? new HashSet<String>() : metadata.topics();
                    this.metadataFetchInProgress = true;
                    ClientRequest metadataRequest = request(now, nodeConnectionId, topics);  //关键点:发送更新Metadata的Request
                    log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
                    doSend(metadataRequest, now); //这里只是异步发送,返回的response在上面的handleCompletedReceives里面处理
                } else if (connectionStates.canConnect(nodeConnectionId, now)) {
                    log.debug("Initialize connection to node {} for sending metadata request", node.id());
                    initiateConnect(node, now);
    
                } else { // connected, but can't send more OR connecting
                    this.lastNoNodeAvailableMs = now;
                }
            }
    
         private void handleCompletedReceives(List<ClientResponse> responses, long now) {
            for (NetworkReceive receive : this.selector.completedReceives()) {
                String source = receive.source();
                ClientRequest req = inFlightRequests.completeNext(source);
                ResponseHeader header = ResponseHeader.parse(receive.payload());
                // Always expect the response version id to be the same as the request version id
                short apiKey = req.request().header().apiKey();
                short apiVer = req.request().header().apiVersion();
                Struct body = (Struct) ProtoUtils.responseSchema(apiKey, apiVer).read(receive.payload());
                correlate(req.request().header(), header);
                if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
                    responses.add(new ClientResponse(req, now, false, body));
            }
        }
    
    
            @Override
            public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
                short apiKey = req.request().header().apiKey();
                if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
                    handleResponse(req.request().header(), body, now);
                    return true;
                }
                return false;
            }
    
    //关键函数
            private void handleResponse(RequestHeader header, Struct body, long now) {
                this.metadataFetchInProgress = false;
                MetadataResponse response = new MetadataResponse(body);
                Cluster cluster = response.cluster();   //从response中,拿到一个新的cluster对象
                if (response.errors().size() > 0) {
                    log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), response.errors());
                }
    
                if (cluster.nodes().size() > 0) {
                    this.metadata.update(cluster, now);   //更新metadata,用新的cluster覆盖旧的cluster
                } else {
                    log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
                    this.metadata.failedUpdate(now);  //更新metadata失败,做失败处理逻辑
                }
            }
    
    
    //更新成功,version+1, 同时更新其它字段
        public synchronized void update(Cluster cluster, long now) {
            this.needUpdate = false;
            this.lastRefreshMs = now;
            this.lastSuccessfulRefreshMs = now;
            this.version += 1;
    
            for (Listener listener: listeners)
                listener.onMetadataUpdate(cluster);  //如果有人监听了metadata的更新,通知他们
    
            this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;    //新的cluster覆盖旧的cluster
    
            notifyAll();  //通知所有的阻塞的producer线程
    
            log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
        }
    
    //更新失败,只更新lastRefreshMs
        public synchronized void failedUpdate(long now) {
            this.lastRefreshMs = now;
        }
    
  • 相关阅读:
    浅谈FastJson的TypeReference用法
    勾选表中的行数据,点击添加,添加到另一个表中(二)
    获取表单内的所有元素的值 表单格式化插件jquery.serializeJSON
    基于BootStrap的Collapse折叠(包含回显展开折叠的对应状态)
    删除按钮和单条删除合并
    前台校验是否为空
    浅谈js的join()方法
    select前台转义后台取到的值为对应的文本 select同时接受list和map
    SpringBoot图片上传(二)
    给div拼接html 拼接字符串
  • 原文地址:https://www.cnblogs.com/byrhuangqiang/p/6377961.html
Copyright © 2011-2022 走看看