一:Consumer API
1.自动提交程序
这种不建议在实际中使用
/** * 简单的消费kafka消息,自动提交 * 消费过的数据再消费不到了 */ public static void helloConsumer() { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.19.129:9092"); properties.setProperty("group.id", "test"); properties.setProperty("enable.auto.commit", "true"); properties.setProperty("auto.commit.interval.ms", "1000"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer(properties); // 订阅 consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true) { // 1000毫秒拉取一次 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } }
2.手动提交offset
如果事务失败了,么有提交,下次还能继续获取到数据
/** * 手动提交 */ public static void commitedOffset() { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.19.129:9092"); properties.setProperty("group.id", "test"); properties.setProperty("enable.auto.commit", "false"); properties.setProperty("auto.commit.interval.ms", "1000"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer(properties); // 订阅 consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true) { // 1000毫秒拉取一次 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } // 手动提交,for循环成功才执行;不然不执行,在下一次还会再拉取数据 consumer.commitAsync(); } }
3.ConsumerGroup
单个分区的消息只能有ConsumerGroup中的某个Consumer消费
Consumer从partition中的消费是顺序,默认从头开始
单个ConsumerGroup会消费所有partition中的消息
4.特性
5.按照patition维度进行处理
/** * 按照patition维度进行处理 */ public static void commitedOffsetWithPartition() { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.19.129:9092"); properties.setProperty("group.id", "test"); properties.setProperty("enable.auto.commit", "false"); properties.setProperty("auto.commit.interval.ms", "1000"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer(properties); // 订阅 consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true) { // 1000毫秒拉取一次 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); // 操作维度是partition了,每个partition单独处理 for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> pRecords = records.records(partition); for (ConsumerRecord<String, String> record : pRecords) { System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } long lastOffset = pRecords.get(pRecords.size() - 1).offset(); // 手动提交 Map<TopicPartition, OffsetAndMetadata> offset = new HashMap(); offset.put(partition, new OffsetAndMetadata(lastOffset + 1)); consumer.commitSync(offset); } } }
6.只消费某个partition
/** * 订阅topic下的partition中的内容 * */ public static void commitedOffsetWithTopicPartition() { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.19.129:9092"); properties.setProperty("group.id", "test"); properties.setProperty("enable.auto.commit", "false"); properties.setProperty("auto.commit.interval.ms", "1000"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer(properties); TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0); TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1); // 订阅partition consumer.assign(Arrays.asList(p1)); while (true) { // 1000毫秒拉取一次 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); // 操作维度是partition了,每个partition单独处理 for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> pRecords = records.records(partition); for (ConsumerRecord<String, String> record : pRecords) { System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } long lastOffset = pRecords.get(pRecords.size() - 1).offset(); // 手动提交 Map<TopicPartition, OffsetAndMetadata> offset = new HashMap(); offset.put(partition, new OffsetAndMetadata(lastOffset + 1)); consumer.commitSync(offset); } } }
二:Consumer API的多线程处理
1.第一种方式
2.程序
package com.jun.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; public class ConsumerThreadSample { private final static String TOPIC_NAME="caojun-topic"; /* 这种类型是经典模式,每一个线程单独创建一个KafkaConsumer,用于保证线程安全 */ public static void main(String[] args) throws InterruptedException { KafkaConsumerRunner r1 = new KafkaConsumerRunner(); Thread t1 = new Thread(r1); t1.start(); Thread.sleep(15000); r1.shutdown(); } public static class KafkaConsumerRunner implements Runnable{ private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer consumer; public KafkaConsumerRunner() { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.19.129:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0); TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1); consumer.assign(Arrays.asList(p0,p1)); } public void run() { try { while(!closed.get()) { //处理消息 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000)); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> pRecord = records.records(partition); // 处理每个分区的消息 for (ConsumerRecord<String, String> record : pRecord) { System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), record.value()); } // 返回去告诉kafka新的offset long lastOffset = pRecord.get(pRecord.size() - 1).offset(); // 注意加1 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } }catch(WakeupException e) { if(!closed.get()) { throw e; } }finally { consumer.close(); } } public void shutdown() { closed.set(true); consumer.wakeup(); } } }
3.第二种方式
这种方式,是没有办法提交offset的,只是为了快速消费数据
4.程序
package com.jun.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ConsumerRecordThreadSample { private final static String TOPIC_NAME = "caojun-topic"; public static void main(String[] args) throws InterruptedException { String brokerList = "192.168.19.129:9092"; String groupId = "test"; int workerNum = 5; CunsumerExecutor consumers = new CunsumerExecutor(brokerList, groupId, TOPIC_NAME); consumers.execute(workerNum); Thread.sleep(1000000); consumers.shutdown(); } // Consumer处理 public static class CunsumerExecutor{ private final KafkaConsumer<String, String> consumer; private ExecutorService executors; public CunsumerExecutor(String brokerList, String groupId, String topic) { Properties props = new Properties(); props.put("bootstrap.servers", brokerList); props.put("group.id", groupId); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); } public void execute(int workerNum) { executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); while (true) { ConsumerRecords<String, String> records = consumer.poll(200); for (final ConsumerRecord record : records) { executors.submit(new ConsumerRecordWorker(record)); } } } public void shutdown() { if (consumer != null) { consumer.close(); } if (executors != null) { executors.shutdown(); } try { if (!executors.awaitTermination(10, TimeUnit.SECONDS)) { System.out.println("Timeout.... Ignore for this case"); } } catch (InterruptedException ignored) { System.out.println("Other thread interrupted this shutdown, ignore for this case."); Thread.currentThread().interrupt(); } } } // 记录处理 public static class ConsumerRecordWorker implements Runnable { private ConsumerRecord<String, String> record; public ConsumerRecordWorker(ConsumerRecord record) { this.record = record; } @Override public void run() { // 假如说数据入库操作 System.out.println("Thread - "+ Thread.currentThread().getName()); System.err.printf("patition = %d , offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } } }
三:一些其他的特性
1.手动控制offset
/** * 手动指定offset的起始位置,手动提交offset * * 手动指定offset起始位置 * 1、人为控制offset起始位置 * 2、如果出现程序错误,重复消费一次 * * 步骤 * 1、第一次从0消费【一般情况】 * 2、比如一次消费了100条, offset置为101并且存入Redis * 3、每次poll之前,从redis中获取最新的offset位置 * 4、每次从这个位置开始消费 * * 建议 * 1.使用redis进行保存 */ public static void controllerOffset() { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.19.129:9092"); properties.setProperty("group.id", "test"); properties.setProperty("enable.auto.commit", "false"); properties.setProperty("auto.commit.interval.ms", "1000"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer(properties); TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0); // 订阅partition consumer.assign(Arrays.asList(p0)); while (true) { // 设置offset consumer.seek(p0, 5); // 1000毫秒拉取一次 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); // 操作维度是partition了,每个partition单独处理 for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> pRecords = records.records(partition); for (ConsumerRecord<String, String> record : pRecords) { System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } long lastOffset = pRecords.get(pRecords.size() - 1).offset(); // 手动提交 Map<TopicPartition, OffsetAndMetadata> offset = new HashMap(); offset.put(partition, new OffsetAndMetadata(lastOffset + 1)); consumer.commitSync(offset); } } }
2.限流
/** * 限流 */ public static void controllerLimit() { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.19.129:9092"); properties.setProperty("group.id", "test"); properties.setProperty("enable.auto.commit", "false"); properties.setProperty("auto.commit.interval.ms", "1000"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer(properties); TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0); TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1); long totalNum = 100; // 订阅partition consumer.assign(Arrays.asList(p0, p1)); while (true) { // 1000毫秒拉取一次 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); // 操作维度是partition了,每个partition单独处理 for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> pRecords = records.records(partition); long num = 0; for (ConsumerRecord<String, String> record : pRecords) { System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); /* 1、接收到record信息以后,去令牌桶中拿取令牌 2、如果获取到令牌,则继续业务处理 3、如果获取不到令牌, 则pause等待令牌 4、当令牌桶中的令牌足够, 则将consumer置为resume状态 */ num++; if(record.partition() == 0){ if(num >= totalNum){ consumer.pause(Arrays.asList(p0)); } } if(record.partition() == 1){ if(num == 40){ consumer.resume(Arrays.asList(p0)); } } } long lastOffset = pRecords.get(pRecords.size() - 1).offset(); // 手动提交 Map<TopicPartition, OffsetAndMetadata> offset = new HashMap(); offset.put(partition, new OffsetAndMetadata(lastOffset + 1)); consumer.commitSync(offset); } } }