正常的消费逻辑需要以下几步:
- 配置消费者相关参数
- 创建一个消费者对象
- 订阅主题
- 拉取消息并消费
- 提交消费位移
- 关闭消费者实例
示例代码:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author: masheng
* @description: Kafka消费者客户端实例
* @date: 2020/07/27 22:12
*/
public class KafkaConsumerAnalyze {
private static final String TOPIC = "topic_test";
private static final String BROKER_LIST = "localhost:9092";
private static final String GROUP_ID = "group_test";
private static final AtomicBoolean isRunning = new AtomicBoolean(true);
/*
* 功能描述: 初始化配置
* @author: masheng
* @time: 2020/7/27
* @param
* @return: java.util.Properties
*/
public static Properties initConfig() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer.client.id.test");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return properties;
}
public static void main(String[] args) {
//1.配置消费者相关参数
Properties properties = initConfig();
//2.创建一个消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//3.订阅主题
consumer.subscribe(Arrays.asList(TOPIC));
try {
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
1.参数配置
4个必填参数:
- bootstrap.servers
- group.id:消费组名称
- key.serializer
- value.serializer
2.订阅主题与分区
一个消费组可以订阅一个或多个主题,如果消费者前后两次订阅了不同的主题,以最后一次为准,可以通过正则的方式订阅主题
方式1:subscribe()方法
方式2:assign()方法,指定需要订阅的分区集合
public void assign(Collection<TopicPartition> partitions) {
acquireAndEnsureOpen();
try {
if (partitions == null) {
throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
} else if (partitions.isEmpty()) {
this.unsubscribe();
} else {
Set<String> topics = new HashSet<>();
for (TopicPartition tp : partitions) {
String topic = (tp != null) ? tp.topic() : null;
if (topic == null || topic.trim().isEmpty())
throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
topics.add(topic);
}
// make sure the offsets of topic partitions the consumer is unsubscribing from
// are committed since there will be no following rebalance
this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
this.subscriptions.assignFromUser(new HashSet<>(partitions));
metadata.setTopics(topics);
}
} finally {
release();
}
}
可以通过KafkaConsumer中的partitionsFor()方法查询指定主题的元数据信息:
public List<PartitionInfo> partitionsFor(String topic) {
return partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs));
}
PartitionInfo为主题的分区元数据信息:
public class PartitionInfo {
private final String topic; //主题
private final int partition; //分区
private final Node leader; //leader副本所在位置
private final Node[] replicas; //分区的AR集合
private final Node[] inSyncReplicas; //ISR集合
private final Node[] offlineReplicas; //OSR集合
}
取消订阅:
使用unsubscribe()方法
总结:
通过subscribe()方法订阅主题具有消费者自动再均衡的功能,而通过assign()方法订阅分区时不具备消费者自动均衡功能
3.反序列化
反序列化器有ByteBufferDeserializer、ByteArrayDeserializer、BytesDeserializer、DoubleDeserializer、FloatDeserializer、IntegerDeserializer、LongDeserializer、ShortDeserializer、StringDeserializer等,实现了Deserializer接口,该接口有三个方法:
//配置当前类
void configure(Map<String, ?> configs, boolean isKey);
//执行反序列化
T deserialize(String topic, byte[] data);
//关闭反序列化器
void close();
StringDeserializer类代码如下:
public class StringDeserializer implements Deserializer<String> {
private String encoding = "UTF8";
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("deserializer.encoding");
if (encodingValue instanceof String)
encoding = (String) encodingValue;
}
@Override
//把byte[]类型转换为String类型
public String deserialize(String topic, byte[] data) {
try {
if (data == null)
return null;
else
return new String(data, encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
}
}
@Override
public void close() {
// nothing to do
}
}
4.消息消费
使用poll()方法
消费者消费到的消息类型为ConsumerRecord:
public class ConsumerRecord<K, V> {
public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
public static final int NULL_SIZE = -1;
public static final int NULL_CHECKSUM = -1;
private final String topic; //主题
private final int partition; //分区
private final long offset; //偏移量
private final long timestamp; //时间戳
private final TimestampType timestampType; //时间戳类型,CreateTime和LogAppendTime
private final int serializedKeySize; //key经过序列化之后的大小
private final int serializedValueSize; //value经过序列化之后的大小
private final Headers headers;
private final K key;
private final V value;
private volatile Long checksum; //CRC32的校验值
}
poll()方法返回值类型时ConsumerRecords,表示一次拉取操作所获得的消息集,内部包含若干ConsumerRecord
5.位移提交
新消费者客户端中,消费位移储存在Kafka内部的主题_consumer_offsets中
假设当前消费者已经消费了x位置的消息,则需要提交的消费位移时x+1,代表下一条需要拉取的消息的位置
KafkaConsumer类提供了position(TopicPartition)和committed(TopicPartition)
方法分别获取消费位置和提交位移
消费位移演示如下:
public class KafkaProducerAnalyze {
private static final String TOPIC = "topic_test";
private static final String BROKER_LIST = "localhost:9092";
/*
* 功能描述: 初始化配置
* @author: masheng
* @time: 2020/7/27
* @param
* @return: java.util.Properties
*/
public static Properties initConfig() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
//所有副本都复制完返回成功,延迟最高,可以设置all、0、1三种
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return properties;
}
public static void main(String[] args) {
//1.配置相关参数
Properties properties = initConfig();
//2.初始化生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
//3.构建发送消息
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "Hello,Kafka!");
try {
//4.发送消息
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
} finally {
//5.关闭生产者实例
producer.close();
}
}
}
Kafka中默认的消费位移的提交方式时自动提交,由消费者客户端参数enable.auto.commit配置,默认为true,定期5秒提交一次,由参数auto.commit.interval.ms配置,自动位移提交会带来重复消费和消息漏消费问题,比如刚提交完消费位移,在下一次提交消费位移之前消费者崩溃了,需要从上一次提交位移的地方重新消费,造成重复消费,如果提交线程先于处理线程,则会造成漏消费
生产中需要开启手动提交,分为同步提交和异步提交,对应于commitSync()和commitAsync()两个方法
同步提交:
consumer.commitSync
可以改为批量处理+批量提交的方式,将拉取到的消息存入缓存,等积累到足够多再批量提交
异步提交:
consumer.commitAsync,提供了一个异步提交的回调函数
异步提交如果失败,可能会导致重复消费问题,可以设置一个递增的序号来维护异步提交的顺序,每次位移提交之后增加序号相对应的值,如果遇到位移提交失败需要重试的时候,检查所提交的位移和序号的值的大小,如果前者小于后者,说明有更大的位移提交了,不需要进行本次重试,如果相同,说明可以进行重试提交
6.控制或关闭消费
使用pause()和resume()方法实现暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区向客户端返回数据的操作
退出消费循环:
使用isRunning.get()方式,通过在其他地方设定该boolean值,或者可以调用KafkaConsumer的weakup()方法,跳出循环
7.指定消费位移
seek()方法,可以指定partition和offset,seek()方法只能重置消费者分配到的分区的消费位置,而分区的分配是在 poll()方法的调用过程中实现的。也就是说,在执行seek()方法之前需要先执行一次poll()方法,等到分配到分区之后才可以重置消费位置
8.再均衡
再均衡是指分区的所属权从一个消费者转移到另一个消费者,一般情况下,应尽量避免不必要的再均衡的发生
再均衡监听器ConsumerRebalanceListener,包含两个方法:
//再均衡开始之前和消费者停止读取消息之后调用,可以处理消费位移的提交,避免一些不必要的重复消费
void onPartitionsRevoked(Collection<TopicPartition> partitions);
//重新分配分区和消费者开始读取消息之前被调用
void onPartitionsAssigned(Collection<TopicPartition> partitions);
下面是一个能极大程度防止重复消费的例子:
public class MessageConsumer {
private static final String TOPIC = "education-info";
private static final String BROKER_LIST = "localhost:9092";
private static KafkaConsumer<String, String> kafkaConsumer = null;
private static Map<TopicPartition, OffsetAndMetadata> currentoffsets = new HashMap<>();
static {
Properties properties = initConfig();
kafkaConsumer = new KafkaConsumer<String, String>(properties);
//订阅消息实现再均衡的回调方法,在此方法中手动提交偏移量,确保再均衡前偏移量提交成功
kafkaConsumer.subscribe(Arrays.asList(TOPIC), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
//再均衡之前和消费者停止读取消息之后调用
kafkaConsumer.commitSync(currentoffsets);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
}
});
}
private static Properties initConfig() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "test");
//设置不自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return properties;
}
public static void main(String[] args) {
try {
while (true) {
//循环拉取消息,100ms是等待broker返回数据的时间,超过时间没有响应则不再等待
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
//循环处理
for (ConsumerRecord record : records) {
try {
System.out.println(record.value());
currentoffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
} catch (Exception e) {
e.printStackTrace();
}
}
//正常消费时,异步提交offset
kafkaConsumer.commitAsync(currentoffsets,null);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//发生异常时,手动提交
kafkaConsumer.commitSync();
} finally {
kafkaConsumer.close();
}
}
}
}
9.多线程实现
KafkaConsumer是非线程安全的,其中定义了一个acquire()方法,用来检测当前是否只有一个线程在操作,acquire()方法和release()方法成对出现,表示相应的加锁和解锁操作
实现方式:
public class MultiConsumerThreadDemo {
private static final String TOPIC = "topic_test";
private static final String BROKER_LIST = "localhost:9092";
private static final String GROUP_ID = "group_test";
//引入共享变量参与提交,RecordHandler类处理完消息后将消费位移保存到offsets,KafkaConsumerThread在每次poll方法之后提交,
// 这种可能会造成数据丢失
private static Map<TopicPartition, OffsetAndMetadata> offsets;
public static Properties initConfig() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return properties;
}
public static void main(String[] args) {
Properties properties = initConfig();
KafkaConsumerThread consumerThread = new KafkaConsumerThread(properties, TOPIC, Runtime.getRuntime().availableProcessors());
consumerThread.start();
}
public static class KafkaConsumerThread extends Thread {
private KafkaConsumer<String, String> consumer;
private ExecutorService executorService;
private int threadNumber;
public KafkaConsumerThread(Properties properties, String topic, int threadNumber) {
this.consumer = new KafkaConsumer<>(properties);
this.consumer.subscribe(Collections.singletonList(topic));
this.threadNumber = threadNumber;
this.executorService = new ThreadPoolExecutor(threadNumber, threadNumber, 0L,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
}
@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
if (!records.isEmpty()) {
executorService.submit(new RecordHandler(records));
}
synchronized (offsets) {
if (!offsets.isEmpty()){
consumer.commitSync(offsets);
offsets.clear();
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
private class RecordHandler extends Thread {
public final ConsumerRecords<String, String> records;
private RecordHandler(ConsumerRecords<String, String> records) {
this.records = records;
}
@Override
public void run() {
//处理records
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> tpRecords = this.records.records(tp);
//处理tpRecords
long lastComsumedOffset = tpRecords.get(tpRecords.size() - 1).offset();
synchronized (offsets){
if (!offsets.containsKey(tp)){
offsets.put(tp,new OffsetAndMetadata(lastComsumedOffset + 1));
} else {
long position = offsets.get(tp).offset();
if (position < lastComsumedOffset + 1) {
offsets.put(tp,new OffsetAndMetadata(lastComsumedOffset + 1));
}
}
}
}
}
}
}
}