生产者是指向kafka发送消息的程序。根据生产者使用场景的不同,对于消费者的配置也有着不同的要求。我们先来看看生产者向kafka中发送一条消息的过程。
- 生成数据 生产者生成消息数据,数据中包含topic, key, value。partition这四类信息。topic和value由用户指定且必须填写,key可以由用户指定或由客户端自己生成,partition可以指定也可以由分区起进行填写。
- 序列化 生产者生成的消息数据需要进行序列化,序列化后才能在网络上进行传输。
- 分区器 传递的消息中如果指定了partition中可以跳过此步骤,如果没有指定则分区器会通过消息中的key来选择分区,选好分区后也就确定了消息将会发送到哪台broker上。
- 发送数据 相同主题和分区的数据会归纳到同一个批次,这个kafka会统一将这个批次的消息进行发送。会有独立的进程会将记录批次发送到相应的broker上。
- 状态响应 如果消息能够被broker接收则向客户端返回成功信息并返回该消息的topic,partiton以及在分区中的offset。如接收失败则返回失败信息,并进行重试,重试几次仍失败则返回错误信息。
在向kafka发送消息时首先要创建一个生产者对象,并设置一些属性。kafka生产者有三个必要的属性:
- bootstrap.servers 指定broker地址
- key.serializer 指定消息键的序列器
- value.serializer 指定消息值的序列器
使用生产者对象发送producerrecored对象,该对象包含消息信息。生产者调用send()函数即可发送数据,数据发送方式有两种:
- 同步发送消息:
最简单的发送方式send()后会返回一个Future对象,然后调用Future对象的get()方法等待kafka返回响应,如果成功则会返回RecoredMetaData,通过这个对象可以获得offset。如果失败则会返回异常信息。这种方法适合吞吐量不大的系统。- 异步发送消息:
使用异步模式发送消息可以通过回调函数来处理发送信息的异常信息。
其他生产者的配置:- acks: 用来设置broker接收到多少消息时才会向生产者返回成功信息,可以设置(0,1,all)
- buffer.memory 用来设置发送缓冲区的大小
- compression.type 选择消息发送时用于对消息进行压缩的算法
- retires 消息发送失败时,用来设置重发的次数
- batch.size 设置一个批次大小,当同一批次里的消息大小达到设定值时会发送到broker,但不是必须要填满一个批次才会发送。
- linger.ms 在发送当前批次前等待更多消息加入批次的时间。当达到时间上限或批次填满时会发送该批次。
- client.id 服务器用它来标识消息的来源
- max.block.ms 该参数指定了可以阻塞的时间,超过时间时会报异常
- max.request.size 用于控制单个消息的大小
- receive.buffer.bytes和send.buffer.bytes 分别指定了TCP接收和发送缓冲区的大小。
- timeout.ms request.timeout.ms metadata.fetch.timeout.ms 分别是等待副本返回确认时间,等待发送数据到服务器返回确认时间,等发送者获取元数据发回确认时间。
序列化器
如果需要传递的消息不是简单的java数据类型而是自定义的数据类型往往需要我们使用自定义序列化器,但这种方式往往并不是很方便,尤其是涉及大团队联合开发的时候,这时候我们可以使用通用的序列化器,例如Avro ,关于Avro的具体使用方法可以后续自行google。
分区
当消息没有指定key的时候可以采用round robin算法将消息均匀的分布到各个分区上。当消息指定了键时通过散列算法将相同的key分布到同样的分区上。kafka同样支持自定义分区算法,根据需要可以自行指定分区算法。