zoukankan      html  css  js  c++  java
  • 高吞吐量的分布式发布订阅消息系统Kafka之Producer源码分析

    引言

    Kafka是一款很棒的消息系统,今天我们就来深入了解一下它的实现细节,首先关注Producer这一方。

    要使用kafka首先要实例化一个KafkaProducer,需要有brokerIP、序列化器等必要Properties以及acks(0、1、n)、compression、retries、batch.size等非必要Properties,通过这个简单的接口可以控制Producer大部分行为,实例化后就可以调用send方法发送消息了。

    核心实现是这个方法:

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);//①
        return doSend(interceptedRecord, callback);//②
    }
    

    通过不同的模式可以实现发送即忘(忽略返回结果)、同步发送(获取返回的future对象,回调函数置为null)、异步发送(设置回调函数)三种消息模式。

    我们来看看消息类ProducerRecord有哪些属性:

    private final String topic;//主题
    private final Integer partition;//分区
    private final Headers headers;//头
    private final K key;//键
    private final V value;//值
    private final Long timestamp;//时间戳
    

    它有多个构造函数,可以适应不同的消息类型:比如有无分区、有无key等。

    ①中ProducerInterceptors(有0 ~ 无穷多个,形成一个拦截链)对ProducerRecord进行拦截处理(比如打上时间戳,进行审计与统计等操作)

    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        ProducerRecord<K, V> interceptRecord = record;
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptRecord = interceptor.onSend(interceptRecord);
            } catch (Exception e) {
                // 不抛出异常,继续执行下一个拦截器
                if (record != null)
                    log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
                else
                    log.warn("Error executing interceptor onSend callback", e);
            }
        }
        return interceptRecord;
    }
    

    如果用户有定义就进行处理并返回处理后的ProducerRecord,否则直接返回本身。
    然后②中doSend真正发送消息,并且是异步的(源码太长只保留关键):

    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            // 序列化 key 和 value
            byte[] serializedKey;
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException cce) {
            }
            byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException cce) {
            }
            // 计算分区获得主题与分区
            int partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);
            // 回调与事务处理省略。
            Header[] headers = record.headers().toArray();
            // 消息追加到RecordAccumulator中
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);
            // 该批次满了或者创建了新的批次就要唤醒IO线程发送该批次了,也就是sender的wakeup方法
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;
        } catch (Exception e) {
            // 拦截异常并抛出
            this.interceptors.onSendError(record, tp, e);
            throw e;
        }
    }
    

    下面是计算分区的方法:

    private int partition(ProducerRecord<K, V> record, 
    byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        // 消息有分区就直接使用,否则就使用分区器计算
        return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey,
                         record.value(), serializedValue, cluster);
    }
    

    默认的分区器DefaultPartitioner实现方式是如果partition存在就直接使用,否则根据key计算partition,如果key也不存在就使用round robin算法分配partition。

    /**
     * The default partitioning strategy:
     * <ul>
     * <li>If a partition is specified in the record, use it
     * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
     * <li>If no partition or key is present choose a partition in a round-robin fashion
     */
    public class DefaultPartitioner implements Partitioner {
    
        private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
        
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            if (keyBytes == null) {//key为空 
                int nextValue = nextValue(topic);
                List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);//可用的分区
                if (availablePartitions.size() > 0) {//有分区,取模就行
                    int part = Utils.toPositive(nextValue) % availablePartitions.size();
                    return availablePartitions.get(part).partition();
                } else {// 无分区,
                    return Utils.toPositive(nextValue) % numPartitions;
                }
            } else {// key 不为空,计算key的hash并取模获得分区
                return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
            }
        }
    
        private int nextValue(String topic) {
            AtomicInteger counter = topicCounterMap.get(topic);
            if (null == counter) {
                counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
                AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
                if (currentCounter != null) {
                    counter = currentCounter;
                }
            }
            return counter.getAndIncrement();//返回并加一,在取模的配合下就是round robin
        }
    }
    

    以上就是发送消息的逻辑处理,接下来我们再看看消息发送的物理处理。

    Sender(是一个Runnable,被包含在一个IO线程ioThread中,该线程不断从RecordAccumulator队列中的读取消息并通过Selector将数据发送给Broker)的wakeup方法,实际上是KafkaClient接口的wakeup方法,由NetworkClient类实现,采用了NIO,也就是java.nio.channels.Selector.wakeup()方法实现。

    Sender的run中主要逻辑是不停执行准备消息和等待消息:

    long pollTimeout = sendProducerData(now);//③
    client.poll(pollTimeout, now);//④
    

    ③完成消息设置并保存到信道中,然后监听感兴趣的key,由KafkaChannel实现。

    public void setSend(Send send) {
        if (this.send != null)
            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
        this.send = send;
        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
    }
    
    // transportLayer的一种实现中的相关方法
    public void addInterestOps(int ops) {
        key.interestOps(key.interestOps() | ops);
    }
    

    ④主要是Selector的poll,其select被wakeup唤醒:

    public void poll(long timeout) throws IOException {
        /* check ready keys */
        long startSelect = time.nanoseconds();
        int numReadyKeys = select(timeout);//wakeup使其停止阻塞
        long endSelect = time.nanoseconds();
        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
    
        if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
            Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
    
            // Poll from channels that have buffered data (but nothing more from the underlying socket)
            if (dataInBuffers) {
                keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
                Set<SelectionKey> toPoll = keysWithBufferedRead;
                keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
                pollSelectionKeys(toPoll, false, endSelect);
            }
    
            // Poll from channels where the underlying socket has more data
            pollSelectionKeys(readyKeys, false, endSelect);
            // Clear all selected keys so that they are included in the ready count for the next select
            readyKeys.clear();
    
            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
            immediatelyConnectedKeys.clear();
        } else {
            madeReadProgressLastPoll = true; //no work is also "progress"
        }
    
        long endIo = time.nanoseconds();
        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
    }
    

    其中pollSelectionKeys方法会调用如下方法完成消息发送:

    public Send write() throws IOException {
        Send result = null;
        if (send != null && send(send)) {
            result = send;
            send = null;
        }
        return result;
    }
    
    private boolean send(Send send) throws IOException {
        send.writeTo(transportLayer);
        if (send.completed())
            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
        return send.completed();
    }
    

    Send是一次数据发包,一般由ByteBufferSend或者MultiRecordsSend实现,其writeTo调用transportLayer的write方法,一般由PlaintextTransportLayer或者SslTransportLayer实现,区分是否使用ssl:

    public long writeTo(GatheringByteChannel channel) throws IOException {
        long written = channel.write(buffers);
        if (written < 0)
            throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
        remaining -= written;
        pending = TransportLayers.hasPendingWrites(channel);
        return written;
    }
    
    public int write(ByteBuffer src) throws IOException {
        return socketChannel.write(src);
    }
    

    到此就把Producer的业务相关逻辑处理和非业务相关的网络 2方面的主要流程梳理清楚了。其他额外的功能是通过一些配置保证的。

    比如顺序保证就是max.in.flight.requests.per.connection,InFlightRequests的doSend会进行判断(由NetworkClient的canSendRequest调用),只要该参数设为1即可保证当前包未确认就不能发送下一个包从而实现有序性

    public boolean canSendMore(String node) {
        Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
        return queue == null || queue.isEmpty() ||
               (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
    }
    

    再比如可靠性,通过设置acks,Sender中sendProduceRequest的clientRequest加入了回调函数:

      RequestCompletionHandler callback = new RequestCompletionHandler() {
            public void onComplete(ClientResponse response) {
                handleProduceResponse(response, recordsByPartition, time.milliseconds());//调用completeBatch
            }
        };
        
         /**
         * 完成或者重试投递,这里如果acks不对就会重试
         *
         * @param batch The record batch
         * @param response The produce response
         * @param correlationId The correlation id for the request
         * @param now The current POSIX timestamp in milliseconds
         */
        private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
                                   long now, long throttleUntilTimeMs) {
        }
        
        public class ProduceResponse extends AbstractResponse {
          /**
             * Possible error code:
             * INVALID_REQUIRED_ACKS (21)
             */
        }
    

    kafka源码一层一层包装很多,错综复杂,如有错误请大家不吝赐教。

  • 相关阅读:
    062 Unique Paths 不同路径
    061 Rotate List 旋转链表
    060 Permutation Sequence 排列序列
    059 Spiral Matrix II 旋转打印矩阵 II
    058 Length of Last Word 最后一个单词的长度
    057 Insert Interval 插入区间
    bzoj3527: [Zjoi2014]力
    bzoj2194: 快速傅立叶之二
    bzoj2820: YY的GCD
    bzoj2005: [Noi2010]能量采集
  • 原文地址:https://www.cnblogs.com/MonsterJ/p/12994418.html
Copyright © 2011-2022 走看看