1.简单消费者(自动提交)
1.1 编写消费者代码
package com.wn.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
/*简单消费者*/
public class MyConsumer {
public static void main(String[] args){
//创建kafka消费者配置信息
Properties properties = new Properties();
//连接集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
//自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//自动提交的延迟
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//key,value的反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"aaa");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//订阅主题 一个消费者可以订阅多个主题
consumer.subscribe(Arrays.asList("aaa","wnwn"));
//获取消息
while (true){
//读取消息的超时时间
ConsumerRecords<String, String> poll = consumer.poll(100);
for(ConsumerRecord<String,String> record:poll){
System.out.println(record.partition());
System.out.println(record.key());
System.out.println(record.value());
System.out.println(record.offset());
System.out.println("------------------------------------------");
}
}
}
}
1.2 启动zookeeper+kafka
1.3 启动,执行方法
1.3.1 启动消费者
1.3.2 启动生产者
bin/kafka-console-producer.sh --broker-list 192.168.138.59092,192.168.138.66:9092,192.168.138.77:9092 --topic aaa
1.3.3 查看效果
2.API消费者重置offset(自动提交)
2.1 编写代码
package com.wn.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
/*消费者重置offset*/
public class MyConsumerOffset {
public static void main(String[] args){
//创建kafka消费者配置信息
Properties properties = new Properties();
//连接集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
//自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//自动提交的延迟
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//key,value的反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//消费者组 如果想重置消费者就需要是新的消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"bbb");
//重置消费者的offset
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//订阅主题 一个消费者可以订阅多个主题
consumer.subscribe(Arrays.asList("aaa","wnwn"));
//获取消息
while (true){
//读取消息的超时时间
ConsumerRecords<String, String> poll = consumer.poll(100);
for(ConsumerRecord<String,String> record:poll){
System.out.println(record.partition());
System.out.println(record.key());
System.out.println(record.value());
System.out.println(record.offset());
System.out.println("------------------------------------------");
}
}
}
}
注意:如果想消费以前消费过的数据,需要将消费者组设置成新的;如果在不修改消费者组的情况下,只能消费没有消费过的数据(启动生产者重新生产数据);
2.2 启动,测试
可以消费以前的数据;
3.API消费者手动提交offset
虽然自动提交offset十分简便,但由于是基于时间提交的,开发人员难以把握offset提交的时机。因此kafka还提供了手动提交的API;
手动提交offset的方式有两种,分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是:都会将本次poll的一批数据最高的偏移量提交;不同点是:commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败),而commitAsync则没有失败重试机制,故有可能提交失败;
3.1 同步提交offset
由于同步提交offset有重试机制,故更加可靠,一下是同步提交offset的案例;
package com.wn.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
/*手动提交offset*/
public class MyConsumerCommit {
public static void main(String[] args){
//创建kafka消费者配置信息
Properties properties = new Properties();
//连接集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
//自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
//自动提交的延迟
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//key,value的反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//消费者组 如果想重置消费者就需要是新的消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"ccc");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//订阅主题 一个消费者可以订阅多个主题
consumer.subscribe(Arrays.asList("aaa","wnwn"));
//获取消息
while (true){
//读取消息的超时时间
ConsumerRecords<String, String> poll = consumer.poll(100);
for(ConsumerRecord<String,String> record:poll){
System.out.println(record.partition());
System.out.println(record.key());
System.out.println(record.value());
System.out.println(record.offset());
System.out.println("------------------------------------------");
}
//同步提交,当前线程会阻塞直到offset提交成功
consumer.commitSync(); //效率低
}
}
}
3.2 异步提交offset
虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量受到很大的影响。因此更多的情况下,会选用异步提交offset的方式;
package com.wn.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
/*手动提交offset*/
public class MyConsumerCommit {
public static void main(String[] args){
//创建kafka消费者配置信息
Properties properties = new Properties();
//连接集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
//自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
//自动提交的延迟
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//key,value的反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//消费者组 如果想重置消费者就需要是新的消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"ccc");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//订阅主题 一个消费者可以订阅多个主题
consumer.subscribe(Arrays.asList("aaa","wnwn"));
//获取消息
while (true){
//读取消息的超时时间
ConsumerRecords<String, String> poll = consumer.poll(100);
for(ConsumerRecord<String,String> record:poll){
System.out.println(record.partition());
System.out.println(record.key());
System.out.println(record.value());
System.out.println(record.offset());
System.out.println("------------------------------------------");
}//异步提交
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if (e!=null){
System.out.println("commit failed for"+map);
}
}
});
}
}
}
3.3 数据漏消费和重复消费分析
无论是同步提交还是异步提交offset,都有可能会造成数据的漏消费或者重复消费。先提交offset后消费,有可能造成数据的漏消费;而先消费后提交offset,有可能造成数据的重复消费;
4.自定义存储offset
kafka9.0版本之前,offset存储在zookeeper0.9版本及之后。默认将offset存储在kafka的一个内置的topic中。除此之外,kafka还可以选择自定义存储offset;
offset的维护是相当繁琐的,因为需要考虑到消费者的Rebalace;
当有新的消费者加入消费者组,已有的消费者推出消费者或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做Rebalance;
消费者发生Rebalance之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取自己被重新分配的分区,并且定位到每个分区最近提交的offset位置继续消费;
要实现自定义存储offset,需要借助ConsumerRebalanceListener,以下为实例代码,其中提交和获取offset的方法,需要根据所选的offset存储系统自行实现;
package com.wn.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class MyConsumerCommitMy {
private static Map<TopicPartition,Long> currentOffset=new HashMap<>();
public static void main(String[] args){
//创建kafka消费者配置信息
Properties properties = new Properties();
//连接集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
//自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
//自动提交的延迟
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//key,value的反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//消费者组 如果想重置消费者就需要是新的消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"ccc");
//创建消费者
final KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//消费者订阅主题
consumer.subscribe(Arrays.asList("aaa"), new ConsumerRebalanceListener() {
//该方法会在rebalance之前调用
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
commitOffset(currentOffset);
}
//该方法会在rebalance之后调用
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
currentOffset.clear();
for (TopicPartition partition:collection){
//定位到最近提交的offset位置继续消费
consumer.seek(partition,getOffset(partition));
}
}
});
//获取消息
while (true){
//读取消息的超时时间
ConsumerRecords<String, String> poll = consumer.poll(100);
for(ConsumerRecord<String,String> record:poll){
System.out.println(record.partition());
System.out.println(record.key());
System.out.println(record.value());
System.out.println(record.offset());
currentOffset.put(new TopicPartition(record.topic(),record.partition()),record.offset());
System.out.println("------------------------------------------");
}
//异步提交
commitOffset(currentOffset);
}
}
//获取某分区的最新offset
private static long getOffset(TopicPartition partition){
return 0;
}
//提交该消费者所有分区的offset
public static void commitOffset(Map<TopicPartition,Long> currentOffset){
}
}