本文主要实际编程讲解kafka生产者相关内容,版本kafka_2.11-0.10.1.0
。
安装
linux
集群安装过程请参考http://www.cnblogs.com/aidodoo/p/7151949.html。
window
安装过程如下:
下载zookeeper安装包(zookeeper-3.4.6),解压到D:Programzookeeper,并设置环境变量
-
添加系统变量
ZOOKEEPER_HOME=D:Programzookeeper
,并在path
后面添加:%ZOOKEEPER_HOME%in
-
将
zoo_sample.cfg
重命名为zoo.cfg
,修改内容如下:tickTime=4000 initLimit=10 syncLimit=5 dataDir=D:/Program/zookeeper/data clientPort=2181 maxClientCnxns=60 server.1=localhost:2888:3888
-
D:/Program/zookeeper/data
目录下新建文件myid
,并用文本软件打开,填入数字1
下载kafka安装包(kafka_2.11-0.10.1.0),解压到D:Programkafka,并设置环境变量
-
添加系统变量
KAFKA_HOME=D:Programkafka
,并在path
后面添加:%KAFKA_HOME%in
-
修改
D:Programkafkaconfigserver.properties
配置文件如下:broker.id=0 advertised.listeners=PLAINTEXT://LAPTOP-2CBRDCI0:9092 advertised.port=9092 advertised.host.name=LAPTOP-2CBRDCI0 log.dirs=D:/Program/kafka/data/kafka-logs zookeeper.connect=localhost:2181/kafka zookeeper.connection.timeout.ms=60000
启动zookeeper
双击脚本D:ProgramzookeeperinzkServer.cmd
启动kafka
命令行运行
D:Programkafkainwindowskafka-server-start.bat D:/Program/kafka/config/server.properties
kafka创建topic
D:Programkafkainwindowskafka-topics.bat --zookeeper LAPTOP-2CBRDCI0:2181/kafka --create --topic TEST1 --replication-factor 1 --partitions 3
D:Programkafkainwindowskafka-topics.bat --zookeeper LAPTOP-2CBRDCI0:2181/kafka --describe --topic TEST
实践
依赖
kafka 0.10.1.0
版本中采用KafkaProducer
对象用来向kafka broker
集群发送消息。
编写代码前先引入相关依赖包:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.0</version>
</dependency>
基本配置和发送流程
KafkaProducer
是线程安全的,即可以跨线程共享单个KafkaProducer
实例,我们先看单线程发送消息的示例,以了解kafka
发送消息的流程。
package com.molyeo.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* Created by zhangkh on 2018/7/11.
*/
public class SinglekafkaProducerDemo {
public static void main(String[] args){
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.flush();
producer.close();
}
}
我们先构建一个props实例,用于保存kafka配置。
acks
acks
配置项表示消息的确认机制。
acks=0
表示生者不会等服务端确认,消息被立即添加到socket buffer
中,并认为已经发送。这种情况下由于客户端不知道消息是否真实发送成功,配置项中的重试次数项retries
也不会生效(即不会重试),每条消息返回的offset
值均为-1
。
acks=1
表示消息的leader
分区收到消息后则被视为消息已发送成功,不会等待副本分区确认。如果leader
分区收到消息后,然后所在节点立即宕机,follower
分区还来不同步,则消息丢失。
acks=all
或者acks=-1
,表示消息的leader
和follower
分区均已收到后才被视为消息已成功发送。这是最严格的确认机制,只要至少min.insync.replicas
还活着,则消息不会丢失。
retries
如果网络原因或者其他异常导致发送请求失败,生产端可以根据参数retries
进行重试。
batch.size
生产者为每个分区维护未发送消息的缓冲区,缓冲区的大小及batch.size
,默认配置为16384,即16KB
linger.ms
逗留时间,默认为0,即使缓冲区有其他未使用的空间,也可以立即发送。
如果我们希望减少服务端的压力,则可以延迟一定时间,待消息量比较大时批量发送。
简单点说,只要满足batch.size
和linger.ms
中的一个条件,生产者发送线程则会发送请求,具体的要分析org.apache.kafka.clients.producer.internals.Sender
类。
buffer.memory
生产者总的消息缓冲区,超过该大小,阻塞max.block.ms
。
生产者其他配置项可参考http://kafka.apache.org/0101/documentation.html#brokerconfigs
重点说一下KafkaProducer的send方法
public Future<RecordMetadata> send(ProducerRecord<K, V> record)
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
这两个send方法均是异步发送,一旦将记录存储在待发送的缓冲区中,均立即返回,这允许并行发送许多记录而不会阻塞,以便在每个记录之后等待响应。
下面的send(ProducerRecord<K, V> record, Callback callback)方法提供了当消息发送成功时的回调,返回的结果RecordMetadata指定记录发送到的分区,分配的偏移量和记录的时间戳。
如果想阻塞同步发送,可以调用Future的get方法:
byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record).get();
完全异步发送,则采用Callback参数来提供请求完成用的回调:
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
}
});
多线程并发发送
为充分利用kafka的高吞吐量,生产端可以采用多线程并发发送消息,前文已提到过KafkaProducer
是线程安全的,即可以跨线程共享单个KafkaProducer
实例。
kafka实际配置类KafkaCommonConfig:
package com.molyeo.kafka;
/**
* Created by zhangkh on 2018/7/10.
*/
public class KafkaCommonConfig {
public static String BOOTSTRAP_SERVERS="LAPTOP-2CBRDCI0:9092";
public static String ACKS="all";
public static int RETRIES=0;
public static int BATCH_SIZE=16384;
public static int LINGER_MS=1;
public static int BUFFER_MEMORY=33554432;
public static String KEY_SERIALIZER_CLASS="org.apache.kafka.common.serialization.StringSerializer";
public static String VALUE_SERIALIZER_CLASS= "org.apache.kafka.common.serialization.StringSerializer";
}
消息发送
package com.molyeo.kafka;
import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by zhangkh on 2018/7/5.
*/
public class MultiKafkaProducerDemo {
private static final int PRODUCER_THREAD_NUM = 5;
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(PRODUCER_THREAD_NUM);
Producer<String, String> producer = new KafkaProducer<String, String>(getProducerConfig());
String topic = "TEST";
try {
for (int i = 0; i < 20; i++) {
Thread.sleep(20);
String key = Integer.toString(i);
String value = Long.toString(System.currentTimeMillis());
ProducerRecord<String, String> record = new ProducerRecord<>(topic,i%3, key, value);
executorService.submit(new CommonProducerThread<>(producer, record));
}
} catch (Exception e) {
e.printStackTrace();
}
try {
//Block for a while
Thread.sleep(60 * 1000L);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
producer.flush();
producer.close();
executorService.shutdown();
}
}
public static Properties getProducerConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaCommonConfig.BOOTSTRAP_SERVERS);
props.put(ProducerConfig.ACKS_CONFIG, KafkaCommonConfig.ACKS);
props.put(ProducerConfig.RETRIES_CONFIG, KafkaCommonConfig.RETRIES);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, KafkaCommonConfig.BATCH_SIZE);
props.put(ProducerConfig.LINGER_MS_CONFIG, KafkaCommonConfig.LINGER_MS);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, KafkaCommonConfig.BUFFER_MEMORY);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,KafkaCommonConfig.KEY_SERIALIZER_CLASS);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,KafkaCommonConfig.VALUE_SERIALIZER_CLASS);
return props;
}
}
class CommonProducerThread<K, V> implements Runnable {
Logger logger = LoggerFactory.getLogger(CommonProducerThread.class.getSimpleName());
private final Producer producer;
private final ProducerRecord<K, V> record;
public CommonProducerThread(Producer producer, ProducerRecord record) {
this.producer = producer;
this.record = record;
}
@Override
public void run() {
logger.info("prepare to send msg:thread name={},key={},value={}", Thread.currentThread().getName(), record.key(), record.value());
producer.send(record, new ProducerAckCallback(System.currentTimeMillis(), record.key(), record.value()));
}
}
class ProducerAckCallback<K, V> implements Callback {
Logger logger = LoggerFactory.getLogger(ProducerAckCallback.class.getSimpleName());
private final long startTime;
private final K key;
private final V value;
public ProducerAckCallback(long startTime, K key, V value) {
this.startTime = startTime;
this.key = key;
this.value = value;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
logger.info("send success:key {},value {}, sent to partition {},offset {} in {} ms", key, value, metadata.partition(), metadata.offset(), elapsedTime);
} else {
exception.printStackTrace();
}
}
}
在程序中我们采用5个线程去发送20条消息,并且指定了消息的分区,每条消息的间隔20ms。输出结果如下:
18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-1,key=0,value=1531360688703
18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-2,key=1,value=1531360688748
18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-3,key=2,value=1531360688853
18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-4,key=3,value=1531360688878
18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-5,key=4,value=1531360688900
18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-1,key=5,value=1531360688921
18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-3,key=6,value=1531360688942
18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 3,value 1531360688878, sent to partition 0,offset 59 in 91 ms
18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 0,value 1531360688703, sent to partition 0,offset 60 in 241 ms
18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 6,value 1531360688942, sent to partition 0,offset 61 in 17 ms
18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 1,value 1531360688748, sent to partition 1,offset 68 in 121 ms
18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 4,value 1531360688900, sent to partition 1,offset 69 in 71 ms
18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 2,value 1531360688853, sent to partition 2,offset 33 in 95 ms
18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 5,value 1531360688921, sent to partition 2,offset 34 in 19 ms
18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-1,key=7,value=1531360688976
18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 7,value 1531360688976, sent to partition 1,offset 70 in 5 ms
18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-3,key=8,value=1531360688997
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 8,value 1531360688997, sent to partition 2,offset 35 in 4 ms
18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-4,key=9,value=1531360689022
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 9,value 1531360689022, sent to partition 0,offset 62 in 3 ms
18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-5,key=10,value=1531360689043
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 10,value 1531360689043, sent to partition 1,offset 71 in 4 ms
18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-2,key=11,value=1531360689065
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 11,value 1531360689065, sent to partition 2,offset 36 in 5 ms
18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-1,key=12,value=1531360689089
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 12,value 1531360689089, sent to partition 0,offset 63 in 5 ms
18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-3,key=13,value=1531360689118
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 13,value 1531360689118, sent to partition 1,offset 72 in 5 ms
18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-4,key=14,value=1531360689141
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 14,value 1531360689141, sent to partition 2,offset 37 in 7 ms
18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-5,key=15,value=1531360689174
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 15,value 1531360689174, sent to partition 0,offset 64 in 5 ms
18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-2,key=16,value=1531360689198
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 16,value 1531360689198, sent to partition 1,offset 73 in 4 ms
18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-1,key=17,value=1531360689220
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 17,value 1531360689220, sent to partition 2,offset 38 in 4 ms
18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-3,key=18,value=1531360689245
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 18,value 1531360689245, sent to partition 0,offset 65 in 4 ms
18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-4,key=19,value=1531360689266
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 19,value 1531360689266, sent to partition 1,offset 74 in 6 ms
本文总结了kafka生产者常用配置,并用多线程发送消息。
本文参考:
http://kafka.apache.org/0101/documentation.html#brokerconfigs
关于作者
爱编程、爱钻研、爱分享、爱生活
关注分布式、高并发、数据挖掘
如需捐赠,请扫码