消费者和消费者群组
生产者往主题写入消息的速度超过了应用程序验证数据的速度。如果使用单个消费者处理消息,应用程序跟不上消息生成的速度。此时,有必要对消费者进行横向伸缩,我们可以使用多个消费者从同一个主题读取消息,对消息进行分流。
一个群组里的消费者订阅同一个主题,每个消费者接收主题的一部分分区的消息。
不同群组之间消费者互不影响
消费者编程
在读取消息之前,需要先创建一个KafkaConsumer对象。
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
订阅
创建消费者后,下一步要订阅主题,subscribe()方法接收一个主题列表作为参数
consumer.subscribe(Collections.singletonList("customerCountries"));
分区分配策略
Kafka有两种分配策略,一是RoundRobin,一是Range
RoundRobin消息轮询(轮流询问)是消费者API的核心,通过一个简单的轮询向服务器请求数据。轮询会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据。
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
log.debug("topic = %s, partition = %s, offset = %d, customer = %s,
country = %s
",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
int updatedCount = 1;
if (custCountryMap.countainsValue(record.value())) {
updatedCount = custCountryMap.get(record.value()) + 1;
}
custCountryMap.put(record.value(), updatedCount)
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4))
}
}
} finally {
consumer.close();
}
Range根据当前的消费者组来划分,一个范围一个范围来分配。会产生消费者之间订阅数量差距过大的问题。
提交和偏移量
每次调用poll()方法,会返回生产者写入kafka但还没被读取的记录,因此我们可以追踪到哪些记录是被群组里的哪个消费者读取的。
消费者默认将offset保存在kafka内置的topic中,该topic为__consumer_offset