Kafka的API实战案例
作者:尹正杰
版权声明:原创作品,谢绝转载!否则将追究法律责任。
一.Producer API
1>.消息发送流程
Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。
main线程将消息发送给RecordAccumulator,
Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
相关参数:
batch.size:
只有数据积累到batch.size之后,sender才会发送数据。
linger.ms:
如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。
2>.异步发送数据-不带回调函数的API案例
package com.yinzhengjie.kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class CustomProducer { public static void main(String[] args){ /** * 需要用到的类: * KafkaProducer: * 需要创建一个生产者对象,用来发送数据 * ProducerConfig: * 获取所需的一系列配置参数 * ProducerRecord: * 每条数据都要封装成一个ProducerRecord对象 */ //创建Properties对象,用于配置kafka集群的信息 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka201.yinzhengjie.com:9092,kafka202.yinzhengjie.com:9092,kafka203.yinzhengjie.com:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG,"all"); props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384); props.put(ProducerConfig.LINGER_MS_CONFIG,1); //创建生产者对象 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); //调用生产者send方法发送数据 for (int i = 1;i<=10000;i++){ producer.send(new ProducerRecord<String, String>("yinzhengjie-kafka",i + "","message-" + i)); } //关闭生产者 producer.close(); } }
[root@kafka201.yinzhengjie.com ~]# kafka-console-consumer.sh --bootstrap-server kafka201.yinzhengjie.com:9092 --topic yinzhengjie-kafka ...... message-9402 message-9412 message-9447 message-9453 message-9462 message-9475 message-9477 message-9486 message-9493 message-9528 message-9545 message-9548 message-9613 message-9616 message-9622 message-9644 message-9646 message-9655 message-9662 message-9689 message-9700 message-9727 message-9746 message-9780 message-9783 message-9784 message-9791 message-9806 message-9812 message-9829 message-9854 message-9864 message-9898 message-9901 message-9951 message-9994
3>.异步发送数据-带回调函数的API
package com.yinzhengjie.kafka.producer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class ProducerCallback { public static void main(String[] args){ /** * 需要用到的类: * KafkaProducer: * 需要创建一个生产者对象,用来发送数据 * ProducerConfig: * 获取所需的一系列配置参数 * ProducerRecord: * 每条数据都要封装成一个ProducerRecord对象 */ //创建Properties对象,用于配置kafka集群的信息 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka201.yinzhengjie.com:9092,kafka202.yinzhengjie.com:9092,kafka203.yinzhengjie.com:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG,"all"); props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384); props.put(ProducerConfig.LINGER_MS_CONFIG,1); //创建生产者对象 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); //调用生产者send方法发送数据 for (int i = 100;i<=200;i++){ producer.send(new ProducerRecord<String, String>("yinzhengjie-kafka", Integer.toString(i), "Message-callback-" + Integer.toString(i)),(recordMetadata, exception) -> { /** * 回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是RecordMetadata和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。 * 温馨提示: * 消息发送失败会自动重试,不需要我们在回调函数中手动重试。 */ if (exception == null){ System.out.println("message send successful!"); }else { exception.printStackTrace(); } }); } //关闭生产者 producer.close(); } }
[root@kafka201.yinzhengjie.com ~]# kafka-console-consumer.sh --bootstrap-server kafka201.yinzhengjie.com:9092 --topic yinzhengjie-kafka Message-callback-107 Message-callback-114 Message-callback-127 Message-callback-141 Message-callback-153 Message-callback-174 Message-callback-180 Message-callback-191 Message-callback-104 Message-callback-133 Message-callback-168 Message-callback-109 Message-callback-120 Message-callback-121 Message-callback-124 Message-callback-135 Message-callback-144 Message-callback-145 Message-callback-156 Message-callback-181 Message-callback-111 Message-callback-147 Message-callback-161 Message-callback-165 Message-callback-185 Message-callback-189 Message-callback-129 Message-callback-148 Message-callback-151 Message-callback-152 Message-callback-175 Message-callback-192 Message-callback-134 Message-callback-154 Message-callback-186 Message-callback-105 Message-callback-142 Message-callback-187 Message-callback-194 Message-callback-137 Message-callback-140 Message-callback-150 Message-callback-102 Message-callback-115 Message-callback-123 Message-callback-143 Message-callback-163 Message-callback-197 Message-callback-106 Message-callback-118 Message-callback-139 Message-callback-146 Message-callback-162 Message-callback-167 Message-callback-171 Message-callback-176 Message-callback-116 Message-callback-130 Message-callback-131 Message-callback-136 Message-callback-182 Message-callback-195 Message-callback-112 Message-callback-119 Message-callback-126 Message-callback-172 Message-callback-184 Message-callback-113 Message-callback-138 Message-callback-149 Message-callback-158 Message-callback-169 Message-callback-198 Message-callback-103 Message-callback-122 Message-callback-125 Message-callback-190 Message-callback-196 Message-callback-199 Message-callback-108 Message-callback-159 Message-callback-166 Message-callback-177 Message-callback-193 Message-callback-101 Message-callback-110 Message-callback-200 Message-callback-157 Message-callback-160 Message-callback-173 Message-callback-178 Message-callback-188 Message-callback-100 Message-callback-117 Message-callback-128 Message-callback-132 Message-callback-155 Message-callback-164 Message-callback-170 Message-callback-179 Message-callback-183
4>.Future测试案例
package com.yinzhengjie.kafka.producer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class TestFuture { public static void main(String[] args) throws Exception{ //创建一个线程池 ExecutorService executor = Executors.newCachedThreadPool(); //提交一个线程 Future<?> future = executor.submit(new Runnable() { @Override public void run() { for (int i = 0; i < 10; i++) { System.out.println("i = " + i); } } }); //调用下面的代码后会阻塞当前线程 future.get(); System.out.println("================="); //停止线程池 executor.shutdown(); } }
5>.同步发送数据
package com.yinzhengjie.kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; public class SyncProducer { public static void main(String[] args) throws ExecutionException, InterruptedException { /** * 需要用到的类: * KafkaProducer: * 需要创建一个生产者对象,用来发送数据 * ProducerConfig: * 获取所需的一系列配置参数 * ProducerRecord: * 每条数据都要封装成一个ProducerRecord对象 */ //创建Properties对象,用于配置kafka集群的信息 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka201.yinzhengjie.com:9092,kafka202.yinzhengjie.com:9092,kafka203.yinzhengjie.com:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG,"all"); props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384); props.put(ProducerConfig.LINGER_MS_CONFIG,1000); //设置发送数据的间隔时间为1秒,单位默认是毫秒 //创建生产者对象 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); //调用生产者send方法发送数据 for (int i = 1;i<=10;i++){ /** * 同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。 * 由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可。 */ RecordMetadata metadata = producer.send(new ProducerRecord<String, String>("yinzhengjie-kafka", Integer.toString(i), "message-" + i)).get(); System.out.println("offset = " + metadata.offset()); } //关闭生产者 producer.close(); } }
二.Consumer API
Consumer消费数据时的可靠性是很容易保证的,因为数据在Kafka中是持久化的,故不用担心数据丢失问题。
由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。
所以offset的维护是Consumer消费数据是必须考虑的问题。
1>.手动提交offset
package com.yinzhengjie.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Arrays; import java.util.Properties; public class CustomConsumer { public static void main(String[] args){ /** * 需要用到的类: * KafkaConsumer: * 需要创建一个消费者对象,用来消费数据 * ConsumerConfig: * 获取所需的一系列配置参数 * ConsuemrRecord: * 每条数据都要封装成一个ConsumerRecord对象 */ //创建Properties对象,用于配置kafka集群的信息 Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka201.yinzhengjie.com:9092,kafka202.yinzhengjie.com:9092,kafka203.yinzhengjie.com:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG,"yinzhengjie2020"); //指定消费者组,只要group.id相同,就属于同一个消费者组 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); //关闭自动提交offset,默认就是自动提交的,即默认值是true. //创建消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //订阅topic consumer.subscribe(Arrays.asList("yinzhengjie-kafka")); //调用pull while (true){ ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println("Topic = " + record.topic() +",Offset = " + record.offset() + ",Value = " + record.value()); } //手动提交offset,若不手动提交(上面我们已经禁用了自动提交offset功能)当Consumer进程结束后,再次启动时你会发现有重复数据出现哟 /** * 同步提交offset,该方法有重试机制,一直到提交成功为止。 */ consumer.commitSync(); /** * 异步提交offset,仅提交一次,并没有失败重试的机制,生产环境中建议推荐使用这种方法,效率较高。 * * 温馨提示: * 如果本次提交失败没有关系,当消费下一批数据是会再次触发异步提交,只要下一次提交成功了尽管上一次提交失败也没有任何影响; * 但是异步提交一直失败的话,可能会导致数据重复消费的问题哟~ * */ consumer.commitAsync(); } } }
[root@kafka201.yinzhengjie.com ~]# kafka-console-producer.sh --bootstrap-server kafka203.yinzhengjie.com:9092 --topic yinzhengjie-kafka >hello >world >https://www.cnblogs.com/yinzhengjie/ >
2>.自动提交offset
package com.yinzhengjie.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Arrays; import java.util.Properties; public class AutoConsumer { public static void main(String[] args){ /** * 需要用到的类: * KafkaConsumer: * 需要创建一个消费者对象,用来消费数据 * ConsumerConfig: * 获取所需的一系列配置参数 * ConsuemrRecord: * 每条数据都要封装成一个ConsumerRecord对象 * * 为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。 * 自动提交offset的相关参数: * enable.auto.commit: * 是否开启自动提交offset功能 * auto.commit.interval.ms: * 自动提交offset的时间间隔 */ //创建Properties对象,用于配置kafka集群的信息 Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka201.yinzhengjie.com:9092,kafka202.yinzhengjie.com:9092,kafka203.yinzhengjie.com:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG,"yinzhengjie2020"); //指定消费者组,只要group.id相同,就属于同一个消费者组 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); //开启自动提交offset,默认就是自动提交的,即默认值是true. props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); //指定自动提交offset的时间间隔为1秒 //创建消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //订阅topic consumer.subscribe(Arrays.asList("yinzhengjie-kafka")); //调用pull while (true){ ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("Topic = %s, offset = %d, key = %s, value = %s%n", record.topic(),record.offset(), record.key(), record.value()); } } } }
[root@kafka201.yinzhengjie.com ~]# kafka-console-producer.sh --bootstrap-server kafka203.yinzhengjie.com:9092 --topic yinzhengjie-kafka >https://www.cnblogs.com/yinzhengjie/ >https://home.cnblogs.com/u/yinzhengjie2020 >hello >world >
3>.自定义存储offset思路
package com.yinzhengjie.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import java.util.Arrays; import java.util.Collection; import java.util.Properties; public class CustomOffsetConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "kafka201.yinzhengjie.com:9092,kafka202.yinzhengjie.com:9092,kafka203.yinzhengjie.com:9092"); props.put("group.id", "yinzhengjie2020");//消费者组,只要group.id相同,就属于同一个消费者组 props.put("enable.auto.commit", "false");//自动提交offset props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("yinzhengjie-kafka"), new ConsumerRebalanceListener() { //提交当前负责的分区的offset @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { System.out.println("===== 回收的分区 ====="); for (TopicPartition partition : partitions) { System.out.printf("Partition = %s%n",partition); } } //定位新分配的分区的offset @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { System.out.println("===== 重新分配的分区 ====="); for (TopicPartition partition : partitions) { System.out.printf("Partition = %s%n",partition); //下面是伪代码,需要自行实现 // Long offset = getPartitionOffset(partition); // consumer.seek(partition,offset); } } }); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("Topic = %s, offset = %d,value = %s%n", record.topic(),record.offset(),record.value()); //下面是伪代码,需要自行实现 // TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); // commitOffset(topicPartition,record.offset()+1); } } } //提交offset,根据你的业务场景自行实现功能 private static void commitOffset(TopicPartition topicPartition, long l) { } //获取分区的offset,根据你的业务场景自行实现功能 private static Long getPartitionOffset(TopicPartition partition) { return null; } }
三.自定义Interceptor
1>.拦截器原理
Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。
对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。
同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。
Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括: configure(configs): 获取配置信息和初始化数据时调用。 onSend(ProducerRecord): 该方法封装进KafkaProducer.send方法中,即它运行在用户主线程(main)中。
Producer确保在消息被序列化以及计算分区前调用该方法。
用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。 onAcknowledgement(RecordMetadata, Exception): 该方法会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用,并且通常都是在producer回调逻辑触发之前。
onAcknowledgement运行在producer的IO线程(sender)中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率。 close: 关闭interceptor,主要用于执行一些资源清理工作。
温馨提示:
如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。
另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递,这在使用过程中要特别留意。
2>.拦截器案例
package com.yinzhengjie.kafka.interceptor; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; public class CounterInterceptor implements ProducerInterceptor<String, String> { private long successNum = 0L; private long errorNum = 0L; @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return record; } //统计成功和失败的次数 @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { if (exception == null) { successNum++; } else { errorNum++; } } @Override public void close() { System.out.println("successNum=" + successNum); System.out.println("errorNum=" + errorNum); } @Override public void configure(Map<String, ?> configs) { } }
package com.yinzhengjie.kafka.interceptor; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; public class TimeInterceptor implements ProducerInterceptor<String, String> { //给value增加时间戳功能 @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return new ProducerRecord<String, String>(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + record.value(), record.headers()); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { System.out.println("已为数据添加时间戳功能...."); } @Override public void configure(Map<String, ?> configs) { } }
package com.yinzhengjie.kafka.interceptor; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.ArrayList; import java.util.Properties; public class CustomProducer { public static void main(String[] args){ /** * 需要用到的类: * KafkaProducer: * 需要创建一个生产者对象,用来发送数据 * ProducerConfig: * 获取所需的一系列配置参数 * ProducerRecord: * 每条数据都要封装成一个ProducerRecord对象 */ //创建Properties对象,用于配置kafka集群的信息 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka201.yinzhengjie.com:9092,kafka202.yinzhengjie.com:9092,kafka203.yinzhengjie.com:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG,"all"); props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384); props.put(ProducerConfig.LINGER_MS_CONFIG,1); //指定拦截器 ArrayList<String> intertceptors = new ArrayList<>(); intertceptors.add("com.yinzhengjie.kafka.interceptor.TimeInterceptor"); intertceptors.add("com.yinzhengjie.kafka.interceptor.CounterInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,intertceptors); //创建生产者对象 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); //调用生产者send方法发送数据 for (int i = 3000;i<=6000;i++){ producer.send(new ProducerRecord<String, String>("yinzhengjie-kafka",Integer.toString(i),"message-" + i)); } //注意哈关闭生产者时会调用拦截器的close()方法哟~ producer.close(); System.out.println("===== 生产者程序已运行完毕 ====="); } }