zoukankan      html  css  js  c++  java
  • Kafka2.0生产者客户端源码分析

      Kafka 在初始化生产者客户端时,创建并启动 Sender 线程。通过 Sender 线程来发送消息、处理消息的响应。通过“volatile boolean running”状态控制 Sender 线程不断轮询,调用 NetworkClient 的 poll 方法。NetworkClient 是 Kafka 实现的用来和 broker 通信的类,实现了 KafkaClient 接口,底层实际上就是利用 JDK NIO 来实现的,而 Kafka 把 NIO 又封装成 Selector。

    调用关系

      Sender 的执行过程可以粗略地分为:发送准备、开始发送。

    void run(long now) {
    	long pollTimeout = sendProducerData(now); // 发送准备
    	client.poll(pollTimeout, now); // 开始发送
    }
    

    发送准备

    1. 取出记录累加器中的记录,转换成节点->消息队列的映射 Map<Integer, List> batches
    2. 使用上述 batches 构造可以发送的请求,缓存到 InFlightRequests
    3. 获取 KafkaChannel,添加消息 NetworkSend,并注册写事件 OP_WRITE
    private long sendProducerData(long now) {
    	// 把分区->消息队列的映射关系转换成节点->消息队列的映射关系
    	Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
    	// 准备发送消息
    	sendProduceRequests(batches, now);
    	return pollTimeout;
    }
    private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
    	Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
    	final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
    
    	ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
    			produceRecordsByPartition, transactionalId);
    	RequestCompletionHandler callback = new RequestCompletionHandler() {
    		public void onComplete(ClientResponse response) { // 请求完成后的回调
    			handleProduceResponse(response, recordsByPartition, time.milliseconds());
    		}
    	};
    	// 构造请求对象
    	ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, requestTimeoutMs, callback); 
    	client.send(clientRequest, now);
    }
    private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
    	String destination = clientRequest.destination();
    	RequestHeader header = clientRequest.makeHeader(request.version());
    	// 构造 Send 的实现类 NetworkSend
    	Send send = request.toSend(destination, header);
    	InFlightRequest inFlightRequest = new InFlightRequest(clientRequest, header, isInternalRequest, request, send, now);
    	// 加入 InFlightRequests
    	this.inFlightRequests.add(inFlightRequest);
    	// 将 NetworkSend 绑定到 KafkaChannel,并注册写操作
    	selector.send(send);
    }
    public void send(Send send) {
    	String connectionId = send.destination();
    	KafkaChannel channel = openOrClosingChannelOrFail(connectionId); // 获取 KafkaChannel 通道
    	channel.setSend(send);
    }
    public void setSend(Send send) {
    	this.send = send; // 绑定到当前 KafkaChannel
    	this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); // 注册写操作
    }
    

    开始发送

    1. 调用 NIO.Selector.select() 方法阻塞轮询,当有事件时,返回准备就绪的 key 数量
    2. 根据事件类型(可读/可写)处理通道内的记录
    3. 把不同事件处理后的响应加入集合,回调准备阶段实现的请求完成处理器来处理响应
    4. 把处理完的响应再次回调 Trunk.onCompletion(),即发送消息时定义的异步回调
    // 真正开始发送
    public List<ClientResponse> poll(long timeout, long now) {
    	long metadataTimeout = metadataUpdater.maybeUpdate(now);
    	this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); // 调用 kafka.Selector.poll()
    
    	// 处理响应
    	long updatedNow = this.time.milliseconds();
    	List<ClientResponse> responses = new ArrayList<>();
    	handleCompletedSends(responses, updatedNow);
    	handleCompletedReceives(responses, updatedNow);
    	...
    	completeResponses(responses); // 回调处理响应
    
    	return responses;
    }
    // kafka.Selector
    public void poll(long timeout) throws IOException {
    	// 执行 NIO.Selector 当有通道准备就绪时,返回 key 的数量
    	int numReadyKeys = select(timeout); 
    	long endSelect = time.nanoseconds();
    
    	if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
    		Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
            // Poll from channels where the underlying socket has more data
            pollSelectionKeys(readyKeys, false, endSelect);
    	}
    
    	// 把已经接收完成的加入 completedReceives 集合
    	addToCompletedReceives();
    }
    // 处理 SelectionKey 准备就绪的 IO
    void pollSelectionKeys(Set<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {
    	for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
    		KafkaChannel channel = channel(key);
    		try {
    			// 判断通道是否可读
    		   if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel) && !explicitlyMutedChannels.contains(channel)) {
    				NetworkReceive networkReceive;
    				while ((networkReceive = channel.read()) != null) { // 保证接收到了完整消息
    					madeReadProgressLastPoll = true;
    					addToStagedReceives(channel, networkReceive);
    				}
    			}
    			// 判断通道是否可写
    			if (channel.ready() && key.isWritable()) {
    				Send send = channel.write(); // 写到 SocketChannel
    			}
    		}
    	}
    }
    

    整体流程

    整体流程

  • 相关阅读:
    单循环判断数组中是否有存在重复值
    【Moss2010系列】利用BCS进行业务数据集成(1)
    状态压缩
    矩阵快速幂
    高精度加法
    旋转treap
    bitset
    快速幂
    splay
    考试注意
  • 原文地址:https://www.cnblogs.com/bigshark/p/11184070.html
Copyright © 2011-2022 走看看