zoukankan      html  css  js  c++  java
  • kafka原理和实践(三)spring-kafka生产者源码

    系列目录

    kafka原理和实践(一)原理:10分钟入门

    kafka原理和实践(二)spring-kafka简单实践

    kafka原理和实践(三)spring-kafka生产者源码

    kafka原理和实践(四)spring-kafka消费者源码

    kafka原理和实践(五)spring-kafka配置详解

    kafka原理和实践(六)总结升华

    ==============正文分割线=====================

    由于项目上了Spring-cloud,继承了spring-boot-start,默认支持版本是spring-kafka-1.1.7,本文基于源码spring-kafka-1.1.7分析。虽然官网已经到2.0版本,但我们分析核心方法基本不变,官网飞机票

    一、 KafkaProducer发送模型

    如上图,由KafkaTemplete发起发送请求,可分为如下几个步骤:

    一、数据入池

    1.KafkaProducer启动发送消息

    2.消息发送拦截器拦截

    3.用序列化器把数据进行序列化

    4.用分区器选择消息的分区

    5.添加进记录累加器

    二、NIO发送数据

    6.等待数据条数达到批量发送阀值或者新建一个RecoedBatch,立即唤醒Sender线程执行run方法

    7.发送器内部从累加器Deque中拿到要发送的数据RecordBatch转换成ClientRequest客户端请求

    8.在发送器内部,经由NetworkClient转换成RequestSend(Send接口)并调用Selector暂存进KafkaChannel(NetWorkClient维护的通道Map<String, KafkaChannel> channels)

    9.执行nio发送消息(1.Selector.select()2.把KafkaChannel中的Send数据(ByteBuffer[])写入KafkaChannel的写通道GatheringByteChannel)

     二、KafkaTemplate模板

    spring-kafka提供了简单的KafkaTemplate类,直接调用发送方法即可,只需要让容器知道这个bean即可(具体见第二章实践中xml中配置bean)。

      1 public class KafkaTemplate<K, V> implements KafkaOperations<K, V> {
      2  14      ...
     15 
     16     /**
     17      * Create an instance using the supplied producer factory and autoFlush false.
     18      * @param producerFactory the producer factory.
     19      */
     20     public KafkaTemplate(ProducerFactory<K, V> producerFactory) {
     21         this(producerFactory, false);
     22     }
     23 
     24     /**
     25      * Create an instance using the supplied producer factory and autoFlush setting.
     26      * Set autoFlush to true if you wish to synchronously interact with Kafka, calling
     27      * {@link java.util.concurrent.Future#get()} on the result.
     28      * @param producerFactory the producer factory.
     29      * @param autoFlush true to flush after each send.
     30      */
     31     public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) {
     32         this.producerFactory = producerFactory;
     33         this.autoFlush = autoFlush;
     34     }
     36    ...
    181     /**
    182      * Send the producer record.
    183      * @param producerRecord the producer record.
    184      * @return a Future for the {@link RecordMetadata}.
    185      */
    186     protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
    187         final Producer<K, V> producer = getTheProducer();
    188         if (this.logger.isTraceEnabled()) {
    189             this.logger.trace("Sending: " + producerRecord);
    190         }
    191         final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
    192         producer.send(producerRecord, new Callback() {
    193 
    194             @Override
    195             public void onCompletion(RecordMetadata metadata, Exception exception) {
    196                 try {
    197                     if (exception == null) {
    198                         future.set(new SendResult<>(producerRecord, metadata));
    199                         if (KafkaTemplate.this.producerListener != null
    200                                 && KafkaTemplate.this.producerListener.isInterestedInSuccess()) {
    201                             KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(),
    202                                     producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata);
    203                         }
    204                     }
    205                     else {
    206                         future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
    207                         if (KafkaTemplate.this.producerListener != null) {
    208                             KafkaTemplate.this.producerListener.onError(producerRecord.topic(),
    209                                     producerRecord.partition(),
    210                                     producerRecord.key(),
    211                                     producerRecord.value(),
    212                                     exception);
    213                         }
    214                     }
    215                 }
    216                 finally {
    217                     producer.close();
    218                 }
    219             }
    220 
    221         });
    222         if (this.autoFlush) {
    223             flush();
    224         }
    225         if (this.logger.isTraceEnabled()) {
    226             this.logger.trace("Sent: " + producerRecord);
    227         }
    228         return future;
    229     }
    235 }

     KafkaTemplate源码重点

    1.构造函数,入参ProducerFactory构造工厂和是否自动刷新(缓冲区的records立即发送)

    2.发送消息doSend,这里核心点就2个:

    1)producer.send(producerRecord, Callback)producer即KafkaProducer

    2)Callback回调onCompletion完成,onSuccess,onError。

    三、KafkaProducer

    3.1KafkaProducer构造过程

      1 @SuppressWarnings({"unchecked", "deprecation"})
      2     private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
      3         try {
      4             log.trace("Starting the Kafka producer");
      5             Map<String, Object> userProvidedConfigs = config.originals();
      6             this.producerConfig = config;
      7             this.time = new SystemTime();
      8 
      9             clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
     10             if (clientId.length() <= 0)
     11                 clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
     12             Map<String, String> metricTags = new LinkedHashMap<String, String>();
     13             metricTags.put("client-id", clientId);
     14             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
     15                     .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
     16                     .tags(metricTags);
     17             List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
     18                     MetricsReporter.class);
     19             reporters.add(new JmxReporter(JMX_PREFIX));
     20             this.metrics = new Metrics(metricConfig, reporters, time);
     21             this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
     22             long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
     23             if (keySerializer == null) {
     24                 this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
     25                         Serializer.class);
     26                 this.keySerializer.configure(config.originals(), true);
     27             } else {
     28                 config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
     29                 this.keySerializer = keySerializer;
     30             }
     31             if (valueSerializer == null) {
     32                 this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
     33                         Serializer.class);
     34                 this.valueSerializer.configure(config.originals(), false);
     35             } else {
     36                 config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
     37                 this.valueSerializer = valueSerializer;
     38             }
     39 
     40             // load interceptors and make sure they get clientId
     41             userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
     42             List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
     43                     ProducerInterceptor.class);
     44             this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
     45 
     46             ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
     47             this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
     48             this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
     49             this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
     50             this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
     51             /* check for user defined settings.
     52              * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
     53              * This should be removed with release 0.9 when the deprecated configs are removed.
     54              */
     55             if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
     56                 log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +
     57                         "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
     58                 boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
     59                 if (blockOnBufferFull) {
     60                     this.maxBlockTimeMs = Long.MAX_VALUE;
     61                 } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
     62                     log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
     63                             "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
     64                     this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
     65                 } else {
     66                     this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
     67                 }
     68             } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
     69                 log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
     70                         "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
     71                 this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
     72             } else {
     73                 this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
     74             }
     75 
     76             /* check for user defined settings.
     77              * If the TIME_OUT config is set use that for request timeout.
     78              * This should be removed with release 0.9
     79              */
     80             if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
     81                 log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +
     82                         ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
     83                 this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG);
     84             } else {
     85                 this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
     86             }
     87 
     88             this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
     89                     this.totalMemorySize,
     90                     this.compressionType,
     91                     config.getLong(ProducerConfig.LINGER_MS_CONFIG),
     92                     retryBackoffMs,
     93                     metrics,
     94                     time);
     95 
     96             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
     97             this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
     98             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
     99             NetworkClient client = new NetworkClient(
    100                     new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
    101                     this.metadata,
    102                     clientId,
    103                     config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
    104                     config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
    105                     config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
    106                     config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
    107                     this.requestTimeoutMs, time);
    108             this.sender = new Sender(client,
    109                     this.metadata,
    110                     this.accumulator,
    111                     config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
    112                     config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
    113                     (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
    114                     config.getInt(ProducerConfig.RETRIES_CONFIG),
    115                     this.metrics,
    116                     new SystemTime(),
    117                     clientId,
    118                     this.requestTimeoutMs);
    119             String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
    120             this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
    121             this.ioThread.start();
    122 
    123             this.errors = this.metrics.sensor("errors");
    124 
    125 
    126             config.logUnused();
    127             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
    128             log.debug("Kafka producer started");
    129         } catch (Throwable t) {
    130             // call close methods if internal objects are already constructed
    131             // this is to prevent resource leak. see KAFKA-2121
    132             close(0, TimeUnit.MILLISECONDS, true);
    133             // now propagate the exception
    134             throw new KafkaException("Failed to construct kafka producer", t);
    135         }
    136     }

    如上图,KafkaProducer包含集合核心组件:

    1)Metadata元数据:维护cluster集群信息、topic信息。

    2)RecordAccumulator记录累加器: 缓存生产数据,然后批量发送,用以减少IO次数,提升性能。

    2)Sender发送器:metadata+RecordAccumulator+NetworkClient网络客户端

    3)KafkaThread IO线程:一个自定义名称的线程,Sender作为Runnable接口,线程start后,运行Sender的run方法,go!

      1 /**
      2      * The main run loop for the sender thread
      3      */
      4     public void run() {
      5         log.debug("Starting Kafka producer I/O thread.");
      6 
      7         // main loop, runs until close is called
      8         while (running) {
      9             try {
     10                 run(time.milliseconds());
     11             } catch (Exception e) {
     12                 log.error("Uncaught error in kafka producer I/O thread: ", e);
     13             }
     14         }
     15 
     16         log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
     17 
     18         // okay we stopped accepting requests but there may still be
     19         // requests in the accumulator or waiting for acknowledgment,
     20         // wait until these are completed.
     21         while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
     22             try {
     23                 run(time.milliseconds());
     24             } catch (Exception e) {
     25                 log.error("Uncaught error in kafka producer I/O thread: ", e);
     26             }
     27         }
     28         if (forceClose) {
     29             // We need to fail all the incomplete batches and wake up the threads waiting on
     30             // the futures.
     31             this.accumulator.abortIncompleteBatches();
     32         }
     33         try {
     34             this.client.close();
     35         } catch (Exception e) {
     36             log.error("Failed to close network client", e);
     37         }
     38 
     39         log.debug("Shutdown of Kafka producer I/O thread has completed.");
     40     }
     41 
     42     /**
     43      * Run a single iteration of sending
     44      * 
     45      * @param now
     46      *            The current POSIX time in milliseconds
     47      */
     48     void run(long now) {
     49         Cluster cluster = metadata.fetch();
     50         // 获取集群中已准备好的分区列表
     51         RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
     52 
     53         // 如果有的分区的leader还未知 ,强制更新元数据
     54         if (!result.unknownLeaderTopics.isEmpty()) {
     58             for (String topic : result.unknownLeaderTopics)
     59                 this.metadata.add(topic);
     60             this.metadata.requestUpdate();
     61         }
     62 
     63         // 移除NetworkClient还没准备好的发送到达的节点
     64         Iterator<Node> iter = result.readyNodes.iterator();
     65         long notReadyTimeout = Long.MAX_VALUE;
     66         while (iter.hasNext()) {
     67             Node node = iter.next();
     68             if (!this.client.ready(node, now)) {
     69                 iter.remove();
     70                 notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
     71             }
     72         }
     73 
     74         // 根据准备好的节点,创建生产者请求
     75         Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
     76                                                                          result.readyNodes,
     77                                                                          this.maxRequestSize,
     78                                                                          now);
     79         if (guaranteeMessageOrder) {
     80             // Mute all the partitions drained
     81             for (List<RecordBatch> batchList : batches.values()) {
     82                 for (RecordBatch batch : batchList)
     83                     this.accumulator.mutePartition(batch.topicPartition);
     84             }
     85         }
     86      // 超时处理
     87         List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
     88         // update sensors
     89         for (RecordBatch expiredBatch : expiredBatches)
     90             this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
     91 
     92         sensors.updateProduceRequestMetrics(batches);
     93         List<ClientRequest> requests = createProduceRequests(batches, now);
     94         // 如果存在已就绪节点,置轮询时间为0
     98         long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
     99         if (result.readyNodes.size() > 0) {
    100             log.trace("Nodes with data ready to send: {}", result.readyNodes);
    101             log.trace("Created {} produce requests: {}", requests.size(), requests);
    102             pollTimeout = 0;
    103         }
    104         for (ClientRequest request : requests)
    105             client.send(request, now);
    106 
    107         // 1.如果有一些分区已准备好,查询时间为0;
    109 // 2.否则如果有分区有数据存储但是还没准备好,查询时间在当前时间和滞留过期时间差 110 // 3.其他情况,查询时间在当前时间和元数据过期时间差 111 this.client.poll(pollTimeout, now); 112 }

     对创建好的requests遍历执行:client.send(request, now);NetworkClient发送ClientRequest

     1 @Override
     2     public void send(ClientRequest request, long now) {
     3         String nodeId = request.request().destination();
     4         if (!canSendRequest(nodeId))
     5             throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
     6         doSend(request, now);
     7     }
     8 
     9     private void doSend(ClientRequest request, long now) {
    10         request.setSendTimeMs(now);
    11         this.inFlightRequests.add(request);
    12         selector.send(request.request());
    13     }
    1 public void send(Send send) {
    2         KafkaChannel channel = channelOrFail(send.destination());
    3         try {
    4             channel.setSend(send);
    5         } catch (CancelledKeyException e) {
    6             this.failedSends.add(send.destination());
    7             close(channel);
    8         }
    9     }

    见上图,最终实际上就是构造了一个KafkaChannel对象,并设置了发送内容和目的地。

    client.poll(pollTimeout, now);实际的IO读写操作。

     1 @Override
     2     public List<ClientResponse> poll(long timeout, long now) {
     3         long metadataTimeout = metadataUpdater.maybeUpdate(now);
     4         try {
     5             this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
     6         } catch (IOException e) {
     7             log.error("Unexpected error during I/O", e);
     8         }
     9 
    10         // 处理执行完后,构建各种ClientResponse添加进responses
    11 long updatedNow = this.time.milliseconds(); 12 List<ClientResponse> responses = new ArrayList<>(); 13 handleCompletedSends(responses, updatedNow); 14 handleCompletedReceives(responses, updatedNow); 15 handleDisconnections(responses, updatedNow); 16 handleConnections(); 17 handleTimedOutRequests(responses, updatedNow); 18 19 //遍历responses处理回调 20 for (ClientResponse response : responses) { 21 if (response.request().hasCallback()) { 22 try { 23 response.request().callback().onComplete(response); 24 } catch (Exception e) { 25 log.error("Uncaught error in request completion:", e); 26 } 27 } 28 } 29 30 return responses; 31 }

    核心方法selector.poll最终执行了什么?

     1 public void poll(long timeout) throws IOException {
     2         if (timeout < 0)
     3             throw new IllegalArgumentException("timeout should be >= 0");
     4 
     5         clear();
     6 
     7         if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
     8             timeout = 0;
     9 
    10         /* check ready keys */
    11         long startSelect = time.nanoseconds();
    12         int readyKeys = select(timeout);
    13         long endSelect = time.nanoseconds();
    14         this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
    15 
    16         if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
    17             pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
    18             pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
    19         }
    20 
    21         addToCompletedReceives();
    22 
    23         long endIo = time.nanoseconds();
    24         this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
    25 
    26         // we use the time at the end of select to ensure that we don't close any connections that
    27         // have just been processed in pollSelectionKeys
    28         maybeCloseOldestConnection(endSelect);
    29     }

    如上图,核心逻辑就2个:查询等待通道,写入数据。

    1)select:等待通道变成就绪状态,返回已准备好的通道数

    1 private int select(long ms) throws IOException {
    2         if (ms < 0L)
    3             throw new IllegalArgumentException("timeout should be >= 0");
    4 
    5         if (ms == 0L)
    6             return this.nioSelector.selectNow();
    7         else
    8             return this.nioSelector.select(ms);
    9     }

    java.nio.channels.Selector nioSelector看上图,最终其实就是一个JDK自带的JAVA NIO Selector执行 select方法,自上次调用select()方法后有多少通道变成就绪状态。

    Selector.select(ms) 最长阻塞ms毫秒(通道在你注册的事件上就绪)。

    Selector.selectNow:不会阻塞,不管什么通道就绪都立刻返回,没有通道变成可选择的,则此方法直接返回零

    NIO Selector

    1.JAVA NIO模型

    比较多,不在这里展开写,预留飞机票一张。

    2.Selector

    关于Selector这里就简单引用一张图,有图有真相。

    2)pollSelectionKeys 如果已准备好通道数>0,根据key把数据(ByteBuffer)写入指定Channel

     1 private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
     2                                    boolean isImmediatelyConnected,
     3                                    long currentTimeNanos) {
     4         Iterator<SelectionKey> iterator = selectionKeys.iterator();
     5         while (iterator.hasNext()) {
     6             SelectionKey key = iterator.next();
     7             iterator.remove();
     8             KafkaChannel channel = channel(key);
     9 
    10             // register all per-connection metrics at once
    11             sensors.maybeRegisterConnectionMetrics(channel.id());
    12             if (idleExpiryManager != null)
    13                 idleExpiryManager.update(channel.id(), currentTimeNanos);
    14 
    15             try {
    16 
    17                 /* complete any connections that have finished their handshake (either normally or immediately) */
    18                 if (isImmediatelyConnected || key.isConnectable()) {
    19                     if (channel.finishConnect()) {
    20                         this.connected.add(channel.id());
    21                         this.sensors.connectionCreated.record();
    22                         SocketChannel socketChannel = (SocketChannel) key.channel();
    23                         log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
    24                                 socketChannel.socket().getReceiveBufferSize(),
    25                                 socketChannel.socket().getSendBufferSize(),
    26                                 socketChannel.socket().getSoTimeout(),
    27                                 channel.id());
    28                     } else
    29                         continue;
    30                 }
    31 
    32                 /* 准备好通道 */
    33                 if (channel.isConnected() && !channel.ready())
    34                     channel.prepare();
    35 
    36                 /* 从channel读取数据 */
    37                 if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
    38                     NetworkReceive networkReceive;
    39                     while ((networkReceive = channel.read()) != null)
    40                         addToStagedReceives(channel, networkReceive);
    41                 }
    42 
    43                 /* 数据写入Channel */
    44                 if (channel.ready() && key.isWritable()) {
    45                     Send send = channel.write();
    46                     if (send != null) {
    47                         this.completedSends.add(send);
    48                         this.sensors.recordBytesSent(channel.id(), send.size());
    49                     }
    50                 }
    51 
    52                 /* cancel any defunct sockets */
    53                 if (!key.isValid()) {
    54                     close(channel);
    55                     this.disconnected.add(channel.id());
    56                 }
    57 
    58             } catch (Exception e) {
    59                 String desc = channel.socketDescription();
    60                 if (e instanceof IOException)
    61                     log.debug("Connection with {} disconnected", desc, e);
    62                 else
    63                     log.warn("Unexpected error from {}; closing connection", desc, e);
    64                 close(channel);
    65                 this.disconnected.add(channel.id());
    66             }
    67         }
    68     }

    3.2 KafkaProducer发送数据

    KafkaProducer.send

     1 @Override
     2     public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
     3         // intercept the record, which can be potentially modified; this method does not throw exceptions
     4         ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
     5         return doSend(interceptedRecord, callback);
     6     }
     7 
     8     /**
     9      * 异步发送一条记录到一个主题的实现类
    10      */
    11     private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    12         TopicPartition tp = null;
    13         try {
    14             // first make sure the metadata for the topic is available
    15             ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
    16             long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
    17             Cluster cluster = clusterAndWaitTime.cluster;
    18             byte[] serializedKey;
    19             try {// 序列化key
    20                 serializedKey = keySerializer.serialize(record.topic(), record.key());
    21             } catch (ClassCastException cce) {
    22                 throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
    23                         " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
    24                         " specified in key.serializer");
    25             }
    26             byte[] serializedValue;
    27             try {// 序列化value
    28                 serializedValue = valueSerializer.serialize(record.topic(), record.value());
    29             } catch (ClassCastException cce) {
    30                 throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
    31                         " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
    32                         " specified in value.serializer");
    33             }
    34 
    35             int partition = partition(record, serializedKey, serializedValue, cluster);
    36             int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
    37             ensureValidRecordSize(serializedSize);
    // 主题和分区
    38 tp = new TopicPartition(record.topic(), partition); 39 long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); 40 log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); 41 // producer callback will make sure to call both 'callback' and interceptor callback 42 Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp); 43 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs); 44 if (result.batchIsFull || result.newBatchCreated) { 45 log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); 46 this.sender.wakeup(); 47 } 48 return result.future;// 返回Future 49 // handling exceptions and record the errors; 50 // for API exceptions return them in the future, 51 // for other exceptions throw directly 52 } catch (ApiException e) { 53 log.debug("Exception occurred during message send:", e); 54 if (callback != null) 55 callback.onCompletion(null, e); 56 this.errors.record(); 57 if (this.interceptors != null) 58 this.interceptors.onSendError(record, tp, e); 59 return new FutureFailure(e); 60 } catch (InterruptedException e) { 61 this.errors.record(); 62 if (this.interceptors != null) 63 this.interceptors.onSendError(record, tp, e); 64 throw new InterruptException(e); 65 } catch (BufferExhaustedException e) { 66 this.errors.record(); 67 this.metrics.sensor("buffer-exhausted-records").record(); 68 if (this.interceptors != null) 69 this.interceptors.onSendError(record, tp, e); 70 throw e; 71 } catch (KafkaException e) { 72 this.errors.record(); 73 if (this.interceptors != null) 74 this.interceptors.onSendError(record, tp, e); 75 throw e; 76 } catch (Exception e) { 77 // we notify interceptor about all exceptions, since onSend is called before anything else in this method 78 if (this.interceptors != null) 79 this.interceptors.onSendError(record, tp, e); 80 throw e; 81 } 82 }

    核心方法,

    1.把需要发送的数据(TopicPartition+序列化后的key,value+)添加进RecordAccumulator记录累加器

    2.sender.wakeup()当累加器满了时,唤醒Sender不再阻塞在当前select()方法上。

     1 /**
     2      * 添加记录进累加器,返回result包含Future、标志位(batch批量发送已满或者新建) 
    7
    * @param tp 主题分区 8 * @param timestamp The timestamp of the record 9 * @param key 序列化后的key 10 * @param value 序列化后的value 11 * @param callback 请求完成时的回调函数 12 * @param maxTimeToBlock 阻塞最大毫秒数 13 */ 14 public RecordAppendResult append(TopicPartition tp, 15 long timestamp, 16 byte[] key, 17 byte[] value, 18 Callback callback, 19 long maxTimeToBlock) throws InterruptedException { 20 // 条数+1,往累加器中添加数据的条数(abortIncompleteBatches方法会作为条件使用 22 appendsInProgress.incrementAndGet(); 23 try { 24 //ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches中获取key=tp的的双向队列,为空新建一个 25 Deque<RecordBatch> dq = getOrCreateDeque(tp); 26 synchronized (dq) {// 阻塞双向队列,一直到获取锁,尝试添加进累加器 27 if (closed) 28 throw new IllegalStateException("Cannot send after the producer is closed."); 29 RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); 30 if (appendResult != null)// 1.如果添加成功,直接返回 31 return appendResult; 32 } 33        // =====2.添加失败==== 34 //2.1划分缓存,再次尝试添加进累加器 35 int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); 36 log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); 37 ByteBuffer buffer = free.allocate(size, maxTimeToBlock); 38 synchronized (dq) {// 阻塞双向队列,一直到获取锁,尝试添加进累加器 39 // 获取双向队列锁之后再次校验生产者是否已关闭 40 if (closed) 41 throw new IllegalStateException("Cannot send after the producer is closed."); 42 43 RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); 44 if (appendResult != null) { 45 //2.2添加成功,释放缓冲区 46 free.deallocate(buffer); 47 return appendResult; 48 }//2.3添加失败,构建一个可写入内存的MemoryRecords 49 MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); 50 RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); 51 FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds())); 52 53 dq.addLast(batch); 54 incomplete.add(batch);// 添加进未完成记录IncompleteRecordBatches 55 return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); 56 } 57 } finally {
            // 条数-1,往累加器中添加记录的条数
    58 appendsInProgress.decrementAndGet(); 59 } 60 }

     看上图append方法,把record添加进累加器调用了三次tryAppend,前两次一样的最后一个参数是Deque,最后一次的最后一个参数是毫秒数。追踪前两个tryAppend

     1 /**
     2      * If `RecordBatch.tryAppend` fails (i.e. the record batch is full), close its memory records to release temporary
     3      * resources (like compression streams buffers).
     4      */
     5     private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {
     6         RecordBatch last = deque.peekLast();
     7         if (last != null) {
     8             FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
     9             if (future == null)
    10                 last.records.close();
    11             else
    12                 return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
    13         }
    14         return null;
    15     }

    如上图,最终还是调用的tryAppend(timestamp, key, value, callback, time.milliseconds());追踪:

     1 /**
     2      * Append the record to the current record set and return the relative offset within that record set
     3      * 
     4      * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
     5      */
     6     public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
     7         if (!this.records.hasRoomFor(key, value)) {
     8             return null;
     9         } else {
    10             long checksum = this.records.append(offsetCounter++, timestamp, key, value);
    11             this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
    12             this.lastAppendTime = now;
    13             FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
    14                                                                    timestamp, checksum,
    15                                                                    key == null ? -1 : key.length,
    16                                                                    value == null ? -1 : value.length);
    17             if (callback != null)
    18                 thunks.add(new Thunk(callback, future));
    19             this.recordCount++;
    20             return future;
    21         }
    22     }

    如上图,append实际就是往RecordBatchMemoryRecords(封装了ByteBuffer等信息)中添加当前record。返回一个FutureRecordMetadata

    最终封装成RecordAppendResult 返回,至此完成了往累加器accumulator中添加一条record。

    再次回归到KafkaTemplete生产者模板发送消息时doSend方法,当KafkaProducer.send发送消息完毕时,如果设置了自动刷新,则执行KafkaProducer.flush()

     1 @Override
     2     public void flush() {
     3         log.trace("Flushing accumulated records in producer.");
     4         this.accumulator.beginFlush();
     5         this.sender.wakeup();
     6         try {
     7             this.accumulator.awaitFlushCompletion();
     8         } catch (InterruptedException e) {
     9             throw new InterruptException("Flush interrupted.", e);
    10         }
    11     }

    KafkaProducer.flush()==》accumulator.awaitFlushCompletion()==》RecordBatch.produceFuture.await()

     1 /**
     2      * Mark all partitions as ready to send and block until the send is complete
     3      */
     4     public void awaitFlushCompletion() throws InterruptedException {
     5         try {
     6             for (RecordBatch batch : this.incomplete.all())
     7                 batch.produceFuture.await();
     8         } finally {
     9             this.flushesInProgress.decrementAndGet();
    10         }
    11     }
    1 private final CountDownLatch latch = new CountDownLatch(1);
    2 
    3 /**
    4      * Await the completion of this request
    5      */
    6     public void await() throws InterruptedException {
    7         latch.await();
    8     }

    如上图,awaitFlushCompletion遍历未完成的RecordBatchProduceRequestResult (生产请求结果)用一个倒计数器(1个任务)等待完成。

     四、总结

    本章,我们结合流程图从kafaTemplete入手分析了kafka生产者发送消息的主要源码,现在看来主要就两个模块,一个是存储数据进累加器缓存,第二个是发送器 netty NIO发送消息。我们发现生产者发送消息源码并不复杂。下一章,讲解消费者源码。

  • 相关阅读:
    【bzoj3083】遥远的国度 树链剖分+线段树
    【bzoj2226】[Spoj 5971] LCMSum 欧拉函数
    xml、json的序列化与反序列化
    什么是安全证书,访问者到底是怎么校验安全证书的,服务端返回安全证书后,客户端再向谁验证呢?
    查看发票组代码后的总结和有感
    网址的正则表达式、常用正则表达式、在线正则表达式检测
    XamlParseException异常
    委托,lambda,匿名方法
    windows中断与共享的连接(samba)
    linux ubuntu 11.04 samba 服务器设置
  • 原文地址:https://www.cnblogs.com/dennyzhangdd/p/7827564.html
Copyright © 2011-2022 走看看