zoukankan      html  css  js  c++  java
  • Kafka2.0生产者客户端源码分析

    1 KafkaProducer 构造器

    1. 初始化参数配置。
    2. 初始化记录累加器 RecordAccumulator。
    3. 初始化 Kafka 连接 KafkaClient,发现集群的所有节点加入缓存。
    4. 初始化实现了 Runnable 接口的 Sender 对象,并在 ioThread 中启动线程。

    2 发送消息

    1. 执行消息拦截器
    2. 查询 Kafka 集群元数据
    3. 序列化 key、value
    4. 获取分区
    5. 把消息添加到记录累加器中
    6. 当 batch 满了,或者创建了新的 batch 后,唤醒 Sender 线程

      核心源码如下

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    	// 执行拦截器
    	ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
    	return doSend(interceptedRecord, callback);
    }
    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    	TopicPartition tp = null;
    	// 获取元数据
    	ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
    	Cluster cluster = clusterAndWaitTime.cluster;
    	// 序列化 key、value
    	byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
    	byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
    	// 获取分区。
    	// 如果为空,会计算 key 的 hash 值,再和该主题的分区总数取余得到分区号;
    	// 如果 key 也为空,客户端会生成递增的随机整数,再和该主题的分区总数区域得到分区号。
    	int partition = partition(record, serializedKey, serializedValue, cluster);
    	tp = new TopicPartition(record.topic(), partition);
    	// 校验序列化后的记录是否超过限制
    	int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
    			compressionType, serializedKey, serializedValue, headers);
    	ensureValidRecordSize(serializedSize);
    	// 时间戳,默认是 KafkaProducer 初始化时间
    	long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
    	// 初始化回调和响应的拦截器对象
    	Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
    	// 把消息添加到记录累加器中
    	RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
    			serializedValue, headers, interceptCallback, remainingWaitMs);
    	if (result.batchIsFull || result.newBatchCreated) {
    		// 当 batch 满了,或者创建了新的 batch 后,唤醒 Sender 线程
    		this.sender.wakeup();
    	}
    	return result.future;
    }
    

     2.1 查询元数据

    1. 如果根据指定的主题和分区能在缓存中查找到,则直接返回元数据,结束流程。
    2. 否则,设置需要更新元数据的标记 needUpdate=true,并获取当前的 version。
    3. 唤醒 Sender 线程,当 Sender 线程判断 needUpdate=true 时,发送获取元数据的请求到 broker,获取到后更新 needUpdate=true,version+1。
    4. 当前线程判断,如果 version 变大,说明元数据已更新,则跳出循环,拉取新的元数据,判断是否匹配到主题和分区,如果没有匹配到,返回第2步。
    5. 如果 version 没变大,说明元数据还没更新,则调用 wait(long timeout) 方法,等待 timeout 时间后,返回第4步。
    6. 当第4步获取到匹配的元数据后,返回给 doSend 方法。

      核心源码如下

    private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) {
    	// 获取缓存的集群信息
    	Cluster cluster = metadata.fetch();
    	Integer partitionsCount = cluster.partitionCountForTopic(topic);
    	// Return cached metadata if we have it, and if the record's partition is either undefined
    	// or within the known partition range
    	// 如果缓存中的数据满足条件,直接返回缓存中的元数据。
    	if (partitionsCount != null && (partition == null || partition < partitionsCount))
    		return new ClusterAndWaitTime(cluster, 0);
    
    	long begin = time.milliseconds();
    	long remainingWaitMs = maxWaitMs;
    	long elapsed;
    	do {
    		// 更新元数据的标记 needUpdate=true,并获取当前的 version。
    		int version = metadata.requestUpdate();
    		sender.wakeup(); // 唤醒 Sender 线程
    		try {
    			metadata.awaitUpdate(version, remainingWaitMs); // 等待更新
    		} catch (TimeoutException ex) {
    		}
    		cluster = metadata.fetch(); // 重新获取元数据
    		elapsed = time.milliseconds() - begin;
    		if (elapsed >= maxWaitMs) // 超出最大等待时间,抛出异常
    			throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
    		remainingWaitMs = maxWaitMs - elapsed;
    		partitionsCount = cluster.partitionCountForTopic(topic);
    	} while (partitionsCount == null); // 分区数量是 0,继续上述循环
    	if (partition != null && partition >= partitionsCount) { // 当指定的分区号大于等于分数总数时,异常
    		throw new KafkaException(String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
    	}
    	return new ClusterAndWaitTime(cluster, elapsed);
    }
    // 等待更新
    public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
         long begin = System.currentTimeMillis();
         long remainingWaitMs = maxWaitMs;
         // 版本号<=当前版本号,说明未更新,需要继续循环等待更新
         while ((this.version <= lastVersion) && !isClosed()) {
             if (remainingWaitMs != 0)
                 wait(remainingWaitMs); // 等待一会再判断
             long elapsed = System.currentTimeMillis() - begin;
             if (elapsed >= maxWaitMs) // 超过了最大等待时间
                 throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
             remainingWaitMs = maxWaitMs - elapsed;
         }
     }
    

     2.2 消息添加到累加器 RecordAccumulator

      2.2.1 缓冲池 BufferPool

      Kafka 使用缓冲池技术给消息分配堆字节缓存 HeapByteBuffer,缓冲池的空闲队列 free 存放了空闲的缓存队列,优先直接从中取出第一个进行分配缓存,如果缓冲池不够了,利用 ReentrantLock + Condition 构造等待队列,等待缓冲池足够分配。
      Kafka 在处理消息响应时,释放分配的内存,并把加入空闲队列 free。

    // 缓冲池
    public class BufferPool {
    	// 可用总内存 buffer.memory
        private final long totalMemory;
    	// 一批消息的大小 batch.size
        private final int poolableSize;
        private final ReentrantLock lock;
    	// 空闲缓存队列
        private final Deque<ByteBuffer> free;
    	// 等待队列
        private final Deque<Condition> waiters;
        // 可用未分配的内存总量是nonPooledAvailableMemory和free * poolableSize中字节缓冲区的总和。
        private long nonPooledAvailableMemory;
    }
    // 字节缓冲分配
    public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
    	if (size > this.totalMemory)
    		throw new IllegalArgumentException("消息大小超过总内存");
    
    	ByteBuffer buffer = null;
    	this.lock.lock();
    	try {
    		// 直接在空闲队列分配
    		if (size == poolableSize && !this.free.isEmpty())
    			return this.free.pollFirst();
    
    		// 计算空闲队列总大小
    		int freeListSize = this.free.size() * this.poolableSize;
    		if (this.nonPooledAvailableMemory + freeListSize >= size) { // 可用的总内存(未分配的+空闲队列)>消息大小
    			// we have enough unallocated or pooled memory to immediately
    			// satisfy the request, but need to allocate the buffer
    			freeUp(size);
    			this.nonPooledAvailableMemory -= size; // 未分配内存总数-消息大小
    		} else { // 内存不够分配
    			int accumulated = 0;
    			Condition moreMemory = this.lock.newCondition();
    			try {
    				long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
    				this.waiters.addLast(moreMemory); // 加入等待队列
    				// loop over and over until we have a buffer or have reserved
    				// enough memory to allocate one
    				while (accumulated < size) { //  轮询,直到足够分配内存
    					long startWaitNs = time.nanoseconds();
    					long timeNs;
    					boolean waitingTimeElapsed;
    					try { // 等待一段时间
    						waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
    					}
    
    					remainingTimeToBlockNs -= timeNs;
    
    					// 直接在空闲队列分配
    					if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
    						buffer = this.free.pollFirst();
    						accumulated = size;
    					} else { // 内存不够,accumulated累加计数
    						freeUp(size - accumulated);
    						int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
    						this.nonPooledAvailableMemory -= got;
    						accumulated += got;
    					}
    				}
    				accumulated = 0; // 清空
    			}
    		}
    	}
    
    	if (buffer == null) // 没有在空闲队列分配到内存,需要在堆上分配内存
    		return new HeapByteBuffer(size, size);
    	else
    		return buffer;
    }
    private void freeUp(int size) {
    	while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
    		this.nonPooledAvailableMemory += this.free.pollLast().capacity(); // 释放空闲队列的内存
    }
    // 处理生产者响应消息时,释放分配的内存
    public void deallocate(ByteBuffer buffer, int size) {
    	lock.lock();
    	try {
    		if (size == this.poolableSize && size == buffer.capacity()) {
    			buffer.clear();
    			this.free.add(buffer); // 加到空闲队列
    		} else {
    			this.nonPooledAvailableMemory += size; // 增加未分配内存数量
    		}
    		Condition moreMem = this.waiters.peekFirst();
    		if (moreMem != null)
    			moreMem.signal();
    	} finally {
    		lock.unlock();
    	}
    }
    

      2.2.2 消息缓存 CopyOnWriteMap

      累加器使用 CopyOnWriteMap 来缓存消息,key 是主题分区信息,value 是个双端队列,队列中的对象是压缩后的批量消息。

    // 累加器缓存
    ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches = new CopyOnWriteMap<>();
    

      CopyOnWriteMap 是线程安全的,是由 Kafka 实现的写时复制 Map,内部定义了 volatile 的 Map,读时不用加锁,直接读取,写时需要加锁,然后拷贝一个 Map 副本进行实际的写入,写入完成后再把原来的 Map 指向修改后的 Map。
      双端队列 Deque 实际上就是 ArrayDeque,非线程安全的,需要手动同步。使用双端队列可以在消息发送失败时,把消息直接放回队列头部进行重试。

    // 累加消息到缓存
    public RecordAppendResult append(TopicPartition tp,
    								 long timestamp,
    								 byte[] key,
    								 byte[] value,
    								 Header[] headers,
    								 Callback callback,
    								 long maxTimeToBlock) throws InterruptedException {
    	ByteBuffer buffer = null;
    	try {
    		Deque<ProducerBatch> dq = getOrCreateDeque(tp); // 检查 batches 是否有该分区的映射,如果没有,则创建一个
    		synchronized (dq) { // 加锁后分配
    			RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
    			if (appendResult != null)
    				return appendResult;
    		}
    
    		byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
    		// 计算消息大小
    		int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
    		buffer = free.allocate(size, maxTimeToBlock); // 利用 BufferPool 分配字节缓存
    		synchronized (dq) { // 加锁后分配
    			RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
    			// 构造出压缩后的批量消息对象 ProducerBatch
    			MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
    			ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
    			FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
    
    			dq.addLast(batch); // 加入双端队列
    
    			return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
    		}
    	}
    }
    
  • 相关阅读:
    ftp上传工具类&FTPClient通过代理连接到FTP服务器
    JS判断闰年与获取月份天数
    SpringBoot文件上传文件大小限制The field file exceeds its maximum permitted size of 1048576 bytes.
    JSON.parse解析json字符串包含 回车换行符报错
    JS解决加减乘除浮点类型丢失精度问题
    Mysql连接异常java.sql.SQLException: The server time zone value '?й???????' is unrecognized or represents more than one time zone.
    Springboot基于Guava+自定义注解实现IP或自定义key限流 升级版
    Centos7设置默认进入图形界面和命令行界面
    第一节 RabbitMQ入门及安装
    第三节 Kafka性能优化实践
  • 原文地址:https://www.cnblogs.com/bigshark/p/11183758.html
Copyright © 2011-2022 走看看