zoukankan      html  css  js  c++  java
  • Kafka(四) —— KafkaProducer源码阅读

    一、doSend()方法

    Kafka中的每一条消息都对应一个ProducerRecord对象。

    
    public class ProducerRecord<K, V> {
    
        private final String topic;
        private final Integer partition;
        private final Headers headers;
        private final K key;
        private final V value;
        private final Long timestamp;
    
    }
    
    
    

    doSend() 源码如下:

    
    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
            TopicPartition tp = null;
            try {
                throwIfProducerClosed();
                // first make sure the metadata for the topic is available
                ClusterAndWaitTime clusterAndWaitTime;
    
                // 1、获取集群的元数据
                try {
                    clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
                } catch (KafkaException e) {
                    if (metadata.isClosed())
                        throw new KafkaException("Producer closed while send in progress", e);
                    throw e;
                }
                long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
                Cluster cluster = clusterAndWaitTime.cluster;
    
                
                //2、 Key、Value 序列化
                byte[] serializedKey;
                try {
                    serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
                } catch (ClassCastException cce) {
                    throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                            " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                            " specified in key.serializer", cce);
                }
                byte[] serializedValue;
                try {
                    serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
                } catch (ClassCastException cce) {
                    throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                            " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                            " specified in value.serializer", cce);
                }
    
                
                // 3、分区器
                int partition = partition(record, serializedKey, serializedValue, cluster);
                // 对topic和分区进行封装,封装后的结果  topic-partition ,[return topic + "-" + partition;]
                tp = new TopicPartition(record.topic(), partition);
    
                setReadOnly(record.headers());
                Header[] headers = record.headers().toArray();
    
                int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                        compressionType, serializedKey, serializedValue, headers);
                ensureValidRecordSize(serializedSize);
                long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
                log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
                // producer callback will make sure to call both 'callback' and interceptor callback
                Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
    
                if (transactionManager != null && transactionManager.isTransactional())
                    transactionManager.maybeAddPartitionToTransaction(tp);
    
    
                // 4、将数据放到accumulator中
                RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                        serializedValue, headers, interceptCallback, remainingWaitMs);
    
                // 5、如果batch已经满了,唤醒 Sender线程发送数据
                if (result.batchIsFull || result.newBatchCreated) {
                    log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                    this.sender.wakeup();
                }
    
                
                return result.future;
    
            } catch (Exception e) {
                // ...
            }
        }
    
    
    

    获取集群元数据

    doSend()中相关的源码

    
                try {
                    clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
                } catch (KafkaException e) {
                    if (metadata.isClosed())
                        throw new KafkaException("Producer closed while send in progress", e);
                    throw e;
                }
                long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
                Cluster cluster = clusterAndWaitTime.cluster;
    
    

    Cluster类定义

    public final class Cluster {
    
        private final boolean isBootstrapConfigured;
        // broker 节点列表
        private final List<Node> nodes;
        private final Set<String> unauthorizedTopics;
        private final Set<String> invalidTopics;
        private final Set<String> internalTopics;
    
        // controller所在节点
        private final Node controller;
    
        // 记录TopicPartition和PartitionInfo的管理
        // TopicPartition中定义了partition、topic两个字段
        // PartitionInfo记录详细的partition信息,定义了leader节点的位置、以Node数组的形式定义了 副本节点的位置、ISR、offline副本
        private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
    
        //Topic 与 分区 的映射关系
        private final Map<String, List<PartitionInfo>> partitionsByTopic;
        private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
        private final Map<Integer, List<PartitionInfo>> partitionsByNode;
        private final Map<Integer, Node> nodesById;
        private final ClusterResource clusterResource;
    
    }
    
    

    调试获得clusterAndWaitTime对象的显示结果

    二、partition()方法

    当Record没有被指定分区时,会调用partiton方法,分配到分区。

    传入的参数是,record的topic、k-v序列化、以及集群信息。

    
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            
            // 从集群中获取Topic对应的分区信息
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            
            //判断key是否为空
            if (keyBytes == null) {
                int nextValue = nextValue(topic);
                List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
                if (availablePartitions.size() > 0) {
                    int part = Utils.toPositive(nextValue) % availablePartitions.size();
                    return availablePartitions.get(part).partition();
                } else {
                    // no partitions are available, give a non-available partition
                    return Utils.toPositive(nextValue) % numPartitions;
                }
            } else {
                // 若key不为空,则对key进行hash,然后对分区数取余
                // hash the keyBytes to choose a partition
                return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
            }
        }
    
    

    三、append()方法

    在调用KafkaProducer构造方法的时候,会创建Accumulator对象。

    RecordAccumulator

    RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送数据,减少网络传输的资源消耗,以提升性能。

    RecordAccumulator内部每个分区都维护一个Deque,其中ProducerBatch包含一至多个ProducerRecord。

    消息写入缓存(RecordAccumulator)时,消息(ProducerRecord)会追加到双端队列(Deque)尾部,追加的过程是,找到分区对应的Deque,然后取出Deque最后一个ProducerBatch,看能否追加,若追加不了,则新建一个ProducerBatch,将消息追加到ProducerBatch中。ProducerBatch的大小,由参数batch.size来确定。

    Sender读取消息时,会从双端队列头部读取消息。

    RecordAccumulator类定义

    
    public final class RecordAccumulator {
    
        private final Logger log;
        private volatile boolean closed;
        private final AtomicInteger flushesInProgress;
        private final AtomicInteger appendsInProgress;
        private final int batchSize;
        private final CompressionType compression;
        private final int lingerMs;
        private final long retryBackoffMs;
        private final int deliveryTimeoutMs;
        private final BufferPool free;
        private final Time time;
        private final ApiVersions apiVersions;
    
        // 在RecordAccumulator的构造方法中实例化 this.batches = new CopyOnWriteMap<>();
        private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
        
    
        private final IncompleteBatches incomplete;
    
    
    
        // The following variables are only accessed by the sender thread, so we don't need to protect them.
        private final Map<TopicPartition, Long> muted;
        private int drainIndex;
        private final TransactionManager transactionManager;
        private long nextBatchExpiryTimeMs = Long.MAX_VALUE; 
    
    }
    
    
    

    调试结果

    append()方法

    
    
    public RecordAppendResult append(TopicPartition tp,
                                         long timestamp,
                                         byte[] key,
                                         byte[] value,
                                         Header[] headers,
                                         Callback callback,
                                         long maxTimeToBlock) throws InterruptedException {
            // We keep track of the number of appending thread to make sure we do not miss batches in
            // abortIncompleteBatches().
            appendsInProgress.incrementAndGet();
            ByteBuffer buffer = null;
            if (headers == null) headers = Record.EMPTY_HEADERS;
            try {
                
                // check if we have an in-progress batch
                // 从上文中我们知道batches的结构是ConcurrentMap<TopicPartition, Deque<ProducerBatch>> 
                // 从batches中检查是否存在这个tp,如果有则返回Deque,若没有则创建
                // 这里的Deque是ArrayDeque
                Deque<ProducerBatch> dq = getOrCreateDeque(tp);
    
                synchronized (dq) {
                    if (closed)
                        throw new KafkaException("Producer closed while send in progress");
                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                    if (appendResult != null)
                        return appendResult;
                }
                
                // 下面一大步是将record中的信息转成buffer对象,进而封装成ProducerBatch对象,再放到dq中。
                // we don't have an in-progress record batch try to allocate a new batch
                byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
                int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
                log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
                buffer = free.allocate(size, maxTimeToBlock);
                synchronized (dq) {
                    // Need to check if producer is closed again after grabbing the dequeue lock.
                    if (closed)
                        throw new KafkaException("Producer closed while send in progress");
    
                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                    if (appendResult != null) {
                        // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                        return appendResult;
                    }
    
                    MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                    ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
                    FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
    
                    dq.addLast(batch);
                    incomplete.add(batch);
    
                    // Don't deallocate this buffer in the finally block as it's being used in the record batch
                    buffer = null;
                    return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
                }
            } finally {
                if (buffer != null)
                    free.deallocate(buffer);
                appendsInProgress.decrementAndGet();
            }
        }
    
    
    

    四、BufferPool

    ByteBuffer的创建和释放比较消耗资源,所以Kafka客户端使用BufferPool来实现ByteBuffer的复用。

    
    public class BufferPool {
    
        private final long totalMemory;
        private final int poolableSize;
        private final ReentrantLock lock;
        private final Deque<ByteBuffer> free;
        // ...
    }
    
    

    五、Sender

    run()方法

     /**
         * Run a single iteration of sending
         *
         * @param now The current POSIX time in milliseconds
         */
        void run(long now) {
            // 省略前面的代码
            long pollTimeout = sendProducerData(now);
            client.poll(pollTimeout, now);
        }
    
    

    sendProducerData()方法

    
    private long sendProducerData(long now) {
    
            // 获取Kafka集群元数据
            Cluster cluster = metadata.fetch();
    
            // get the list of partitions with data ready to send
            // 获取准备发送数据的partition列表
            // ReadyCheckResult的成员变量有Set<Node> readyNodes(准备发送的节点),Set<String> unknownLeaderTopics
            RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
    
            // if there are any partitions whose leaders are not known yet, force metadata update
            // 如果有不知道leader节点在哪的partition,强制更新元数据
            // 向元数据中增加topic,并且进行更新
            if (!result.unknownLeaderTopics.isEmpty()) {
                // The set of topics with unknown leader contains topics with leader election pending as well as
                // topics which may have expired. Add the topic again to metadata to ensure it is included
                // and request metadata update, since there are messages to send to the topic.
                for (String topic : result.unknownLeaderTopics)
                    this.metadata.add(topic);
                this.metadata.requestUpdate();
            }
            
            // remove any nodes we aren't ready to send to
            // 移除所有不准备发送的节点
            Iterator<Node> iter = result.readyNodes.iterator();
            long notReadyTimeout = Long.MAX_VALUE;
            while (iter.hasNext()) {
                Node node = iter.next();
                if (!this.client.ready(node, now)) {
                    iter.remove();
                    notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
                }
            }
    
            // create produce requests
            // 创建一个produce 请求
            Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
                    this.maxRequestSize, now);
            if (guaranteeMessageOrder) {
                // Mute all the partitions drained
                for (List<ProducerBatch> batchList : batches.values()) {
                    for (ProducerBatch batch : batchList)
                        this.accumulator.mutePartition(batch.topicPartition);
                }
            }
    
            List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
            // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
            // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
            // we need to reset the producer id here.
            if (!expiredBatches.isEmpty())
                log.trace("Expired {} batches in accumulator", expiredBatches.size());
            for (ProducerBatch expiredBatch : expiredBatches) {
                failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false);
                if (transactionManager != null && expiredBatch.inRetry()) {
                    // This ensures that no new batches are drained until the current in flight batches are fully resolved.
                    transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
                }
            }
    
            sensors.updateProduceRequestMetrics(batches);
    
            // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
            // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
            // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
            // with sendable data that aren't ready to send since they would cause busy looping.
            long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
            if (!result.readyNodes.isEmpty()) {
                log.trace("Nodes with data ready to send: {}", result.readyNodes);
                // if some partitions are already ready to be sent, the select time would be 0;
                // otherwise if some partition already has some data accumulated but not ready yet,
                // the select time will be the time difference between now and its linger expiry time;
                // otherwise the select time will be the time difference between now and the metadata expiry time;
                pollTimeout = 0;
            }
            sendProduceRequests(batches, now);
    
            return pollTimeout;
        }
    
    

    参考文档

    kafka源码编译

    Kafka 源码解析之 Producer 发送模型(一)

    Apache Kafka源码剖析

    详解Kafka Producer

  • 相关阅读:
    mysql中的round函数的使用
    mysql中日期函数的处理,datediff()函数 与 timestampdiff()函数的区别 及使用。
    easyui datagrid 自定义editor
    好的产品 跟 好的 设计师 很类似
    music
    gd库复制图片做水印
    用gd库画矩形和椭圆
    默认安装wamp修改MySQL密码
    中文验证码
    验证码
  • 原文地址:https://www.cnblogs.com/fonxian/p/11931571.html
Copyright © 2011-2022 走看看