zoukankan      html  css  js  c++  java
  • Kafka producer 的活动关系图

    在使用kafka producer API时,主要有2个过程:创建producer实例过程,调用producer实例发送数据(同步/异步)过程,其实在创建producer实例的同时,也创建了一个sender线程,sender线程不断轮询更新metadata及从accumulator中读取数据并真正发送到某个broker上面,下面的活动关系图大致描述了producer的API的内部调用过程

    创建producer实例:

    1:client读取producer config,sample如下:

    {
    	security.protocol=SASL_PLAINTEXT,
    	bootstrap.servers=server1.com:8881,server2:8882,
    	value.serializer=xxx.serialization.avro.AvroWithSchemaSpecificSer,
    	key.serializer=org.apache.kafka.common.serialization.LongSerializer,
    	client.id=15164@hostname,
    	acks=all
    }

    2:调用以下方法创建producer实例

    Producer<K,V> producer = new KafkaProducer<>(props, keySerClass, valueSerClass);

    3:Kafka Producer实例是producer的关键入口,封装了后续所有组件的调用,在创建producer实例的过程中,将依次创建以下组件

    创建metadata实例,传递参数 refreshBackoffMs(最小过期时间retry.backoff.ms,默认值100毫秒),metadataExpireMs(元数据最大保留时间metadata.max.age.ms,默认值300000毫秒)

    metadata保存了topic,partition,borker的相关信息

    this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));

    4:创建累加器RecordAccumulator

    累计器是一个保存消息的有边界的内存队列,当客户端发送数据时,数据将append到队列尾部,如果内存耗尽,append调用将被阻塞,在实例内部,batches成员保存将被sender的数据

    private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;

    调用方法

    this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), //batch.size,默认值16384(16K),record成批发送的字节数
                        this.totalMemorySize, //buffer.memory,默认值33554432(32M),缓冲区内存字节数
                        this.compressionType, //compression.type,默认值none,表示producer的数据压缩类型,有效值为none,gzip,snappy,lz4
                        config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                        retryBackoffMs, //retry.backoff.ms,默认值100毫秒,metadata最小过期时间
                        metrics,
                        time);

     5:创建通道构建器实例,ChannelBuilder是一个接口,因安全认证方式不同,分别有具体的实现类SaslChannelBuilder,SslChannelBuilder及PlaintextChannelBuilder,通道是java nio中的实际与IO通信的部分,在kafka中,类KafkaChannel封装了SocketChannel。在创建构建器实例时,将进行登录认证

    ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());

    6:创建网络客户端实例,NetworkClient封装了底层网络的访问,及metadata数据的更新。

                NetworkClient client = new NetworkClient(
                        new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
                        this.metadata,
                        clientId,
                        config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
                        config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                        config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                        config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                        this.requestTimeoutMs, time);

    在NetworkClient构造函数中,传进了Selector的实例,这是一个封装了java nio selector的选择器,在Selector构造函数中开启了java nio selector,并且也将前面创建的ChannelBuilder传给NetworkClient内部成员

    this.nioSelector = java.nio.channels.Selector.open();

     7:创建线程类sender并启动线程,将前面创建的NetworkClient实例,metadata实例及累加器accumulator全部导入到sender类中,sender线程类是一个关键类,所有的动作都是在这个线程中处理的,当sender线程启动后,不断的轮询进行元数据的更新和消息的发送

                this.sender = new Sender(client,
                        this.metadata,
                        this.accumulator,
                        config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
                        config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                        (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                        config.getInt(ProducerConfig.RETRIES_CONFIG),
                        this.metrics,
                        new SystemTime(),
                        clientId,
                        this.requestTimeoutMs);
                String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
                this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
                this.ioThread.start();

    发送数据过程:

    1:客户端发送数据

    客户端构造好ProducerRecord,调用send方法发送消息,send方法是一个异步方法,返回一个future对象,调用完后就立即返回,send方法只是把数据写入到内存队列RecordAccumulator后就返回了,如果想同步发送消息并确认消息是否发送成功,可以再调用get方法,这将阻塞当前发送线程

    ProducerRecord<K, V> producerRecord = new ProducerRecord<>(“topic”,data);
    producer.send(producerRecord, new ProducerCallBack(requestId));

    在调用send方法时,可以传入回调对象,回调函数用于处理send后对ack的处理

    2:record 保存到累加器中

    在send方法内部,如果有配置拦截器,则先调用拦截器对数据做处理,处理完后的数据,再调用doSend方法,

    ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback); //异步发送一条记录到topic

    在doSend方法把记录保存到累加器之前,需要做几个事情,首先需要调用waitOnMetadata确认给定topic的并包含partition的metadata是否可用,在waitOnMetadata中如果没有partition,则在循环中请求更新metadata,并唤醒sender线程 (sender.wakeup()),更新metadata,如果超出block时间则timeout异常退出

    long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs); //KafkaProducer.send()的最长阻塞时间,max.block.ms,默认为60000毫秒
    while (metadata.fetch().partitionsForTopic(topic) == null) {
                log.trace("Requesting metadata update for topic {}.", topic);
                int version = metadata.requestUpdate();
                sender.wakeup(); //唤醒sender线程去更新metadata
                metadata.awaitUpdate(version, remainingWaitMs); //等待metadata更新,如果超出max.block.ms时间,则抛出timeout异常
                ......
    }

    如果在到达 max.block.ms前成功更新metadata,则对record做key/value序列化,添加到累加器中,然后再次唤醒sender线程

    RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);

    在这个阶段中,数据只是发送到一个中间的缓冲区中,并没有真正的传到broker上

    sender线程

    1:sender线程启动后,循环执行Sender.run(now)函数,此函数的最主要功能就是从accumulator中drain出数据,转成生产者请求数据List<ClientRequest>

    2:如果有生产者请求数据列表,则调用NetworkClient.send函数,此函数内部调用了doSend(ClientRequest,long)函数,将请求加入飞行队列中inFlightRequests,

    {NetworkClient}    

    private void doSend(ClientRequest request, long now) { request.setSendTimeMs(now); this.inFlightRequests.add(request); //将请求加入飞行队列 selector.send(request.request()); //数据被保存到KafkaChannel的内存中 }

    3:接着调用Selector.send(Send)方法,从Send数据中取到目标KafkaChannel,再放到KafkaChannel通道的待发送内存中,此时数据还没有真正的被传递broker

    {Selector} 
    public void send(Send send) { KafkaChannel channel = channelOrFail(send.destination()); //取到目标channel try { channel.setSend(send); //把数据保存到目标channel内存中 } catch (CancelledKeyException e) { this.failedSends.add(send.destination()); close(channel); } }

    4:当所有从accumulator中取出来的数据被放到对应的channel后,调用NetworkClient.poll,这才是对socket实际读写的地方

    5-8:首先需要对元数据的更新,在maybeUpdate中,如果元数据更新时间(metadataTimeout)已经到0了,说明需要更新元数据,找到最近最少用的节点,如果没建好连接,则先创建socket的连接,到第7步,调用Selector.connect,到第8步调用SocketChannel.open及SocketChannel.connect建立socket连接

    {NetworkClient}
    
            public long maybeUpdate(long now) {
                // should we update our metadata?
                long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
                long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
                long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
                // if there is no node available to connect, back off refreshing metadata
                long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
                        waitForMetadataFetch);
    
                if (metadataTimeout == 0) {
                    // Beware that the behavior of this method and the computation of timeouts for poll() are
                    // highly dependent on the behavior of leastLoadedNode.
                    Node node = leastLoadedNode(now);
                    maybeUpdate(now, node);
                }
    
                return metadataTimeout;
            }
    
    private void maybeUpdate(long now, Node node) {
                if (node == null) {
                    log.debug("Give up sending metadata request since no node is available");
                    // mark the timestamp for no node available to connect
                    this.lastNoNodeAvailableMs = now;
                    return;
                }
                String nodeConnectionId = node.idString();
    
                if (canSendRequest(nodeConnectionId)) { //如果连接以及建好,可以发送数据
                    this.metadataFetchInProgress = true;
                    MetadataRequest metadataRequest;
                    if (metadata.needMetadataForAllTopics())
                        metadataRequest = MetadataRequest.allTopics();
                    else
                        metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
                    ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
                    log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
                    doSend(clientRequest, now); //发送数据,将数据保存到channel的内存中
                } else if (connectionStates.canConnect(nodeConnectionId, now)) {
                    // we don't have a connection to this node right now, make one
                    log.debug("Initialize connection to node {} for sending metadata request", node.id());
                    initiateConnect(node, now); //创建socket连接
                    // If initiateConnect failed immediately, this node will be put into blackout and we
                    // should allow immediately retrying in case there is another candidate node. If it
                    // is still connecting, the worst case is that we end up setting a longer timeout
                    // on the next round and then wait for the response.
                } else { // connected, but can't send more OR connecting
                    // In either case, we just need to wait for a network event to let us know the selected
                    // connection might be usable again.
                    this.lastNoNodeAvailableMs = now;
                }
            }

    9:socket连接建立好后,注册到nioSelector中,并使用前面创建的channelBuilder创建KafkaChannel通道,KafkaChannel中创建和封装了传输层TransportLayer,传输层封装了socketChannel,通道保存在Map中,而且KafkaChannel中也创建了authenticator认证器

    {Selector}
    
         SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT); //注册到Selector中
         KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize); //创建KafkaChannel
         key.attach(channel);
         this.channels.put(id, channel);

    10:如果到节点的状态是连接的,并且channel状态已经ready(传输层是ready的,authenticator认证器是complete状态),那么在上面metadataUpdate中,就可以发送元数据更新的请求,调用NetworkClient.doSend,数据被放到飞行请求队列(inFlightRequests)及相应的KafkaChannel内存中,如果inFlightRequests中的请求在请求超时后(默认为"request.timeout.ms" -> "30000",30秒),将断开请求所对应的socket连接,并设置重刷metadata

    11:调用Selector.poll进行数据的IO操作,如果获取到selector key,则在这个可以上取到对应的KafkaChannel,调用channel.write发送数据到broker上

  • 相关阅读:
    LeetCode 79. 单词搜索
    LeetCode 1143. 最长公共子序列
    LeetCode 55. 跳跃游戏
    LeetCode 48. 旋转图像
    LeetCode 93. 复原 IP 地址
    LeetCode 456. 132模式
    LeetCode 341. 扁平化嵌套列表迭代器
    LeetCode 73. 矩阵置零
    LeetCode 47. 全排列 II
    LeetCode 46. 全排列
  • 原文地址:https://www.cnblogs.com/benfly/p/9360563.html
Copyright © 2011-2022 走看看