1. Kafka发送流程
kafka的发送流程可以简单概括为如下的图。这幅图我们可以分为三部分来理解。中间的(深蓝色矩形)部分的流程是发送的核心流程(同步发送);左边(淡蓝色)是异步发送时相关的额外流程,右边(黄色)是客户端更新元信息相关的流程。简单概括为:
- 同步发送流程
- 异步发送流程
- 更新元信息流程
2.1 创建生产者
一般在生产者客户端代码中我们使用如下这样的代码来创建一个生产者。
Produce p = new Producer(new ProducerConfig());
实际上在运行该代码后,我们是启动了三个实例,同时也初始化了ProducerConfig类完成了生产者的配置。三个实例如下:
- Producer
- DefaultEventHandler
- ProducerPool: 连接不同kafka broker的生产者池,连接个数有broker.list参数决定
2.2 同步发送数据
- Producer实例调用其send方法
- 本质是调用了Handler的handle(message)
- handler序列化消息
- handler调用dispatchSerializedData方法来调度序列化后的消息
- dispatchSerializedData方法调用partitionAndCollate方法对topic的message进行分组(根据获取的leaderBrokerId元数据来对消息分组)
- 从生产者池中获取不同broker对应的生产者,来真正的发送消息
2.3 异步发送流程
异步发送可以结合同步发送的流程来看。异步发送流程就是在同步发送流程前面多进行了一些额外的流程,来达到异步批量发送的目的。
额外增加的流程为:
- 根据生产者API,采用异步方式,则先将消息写入一个阻塞队列
- DefaultEventHandler定期向阻塞队列拉去消息
- 后面和同步发送流程相同
结合下图的流程来理解同步发送流程和异步发送流程(区别可以看到就是多了一个阻塞队列):
-
同步发送流程
2.异步发送流程