本文主要介绍使用Java API来操作Kafka,文中所使用到的软件版本:Java 1.8.0_191、Kafka 2.13-2.4.1、kafka-clients 2.5.0、junit 4.13。
1、引入依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13</version> </dependency>
2、生产者
package com.inspur.demo.general.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.Properties; /** * kafka生产者示例 */ public class ProducerCase { private Properties props; private Producer producer; @Before public void before() { props = new Properties(); props.put("bootstrap.servers", "10.49.196.10:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("acks", "all");//所有follower都响应了才认为消息提交成功,即"committed" //props.put("retries", "10");//连接失败重试次数 //props.put("batch.size", "16384");//每个分区缓冲区大小,当数据大小达到设定值后,就会立即发送,不顾下面的linger.ms //props.put("linger.ms", "1");//producer将会等待给定的延迟时间以允许其他消息记录一起发送,默认为0 //props.put("buffer.memory", "33554432");//producer缓冲区大小 //props.put("max.block.ms", "60000");//当缓冲区满了,发送消息时最大等待时间 } @After public void after() throws Exception { producer.close(); } /** * 简单使用 */ @Test public void simpleUse() throws Exception { producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { //发送key和value producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "消息-" + i)); //只发送value producer.send(new ProducerRecord<String, String>("test", "消息2-" + i)); } } /** * 以事务方式发送消息 * @throws Exception */ @Test public void transactional() throws Exception { props.put("transactional.id", "tid123");//必须设置,不同的生产者需要设置不同的事务id producer = new KafkaProducer<>(props); producer.initTransactions(); try { producer.beginTransaction(); for (int i = 0; i < 10; i++) { //发送key和value producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "消息-" + i)); //只发送value producer.send(new ProducerRecord<String, String>("test", "消息2-" + i)); } producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // We can't recover from these exceptions, so our only option is to close the producer and exit. producer.close(); } catch (KafkaException e) { // For all other exceptions, just abort the transaction and try again. producer.abortTransaction(); } } }
3、消费者
package com.inspur.demo.general.kafka; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; /** * kafka消费者示例 */ public class ConsumerCase { private Properties props; private Consumer consumer; @Before public void before() { props = new Properties(); props.put("bootstrap.servers", "10.49.196.10:9092"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("group.id", "test"); //props.put("retries", "10");//连接失败重试次数 //props.put("batch.size", "16384");//每个分区缓冲区大小,当数据大小达到设定值后,就会立即发送,不顾下面的linger.ms //props.put("linger.ms", "1");//producer将会等待给定的延迟时间以允许其他消息记录一起发送,默认为0 //props.put("buffer.memory", "33554432");//producer缓冲区大小 //props.put("max.block.ms", "60000");//当缓冲区满了,发送消息时最大等待时间 } @After public void after() throws Exception { consumer.close(); } /** * 自动提交 */ @Test public void automatic() throws Exception { props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value()); } } } /** * 手动提交 * @throws Exception */ @Test public void manual() throws Exception { props.setProperty("enable.auto.commit", "false"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value()); } //insertIntoDb(records);//具体业务处理 consumer.commitSync(); } } }
4、流处理
package com.inspur.demo.general.kafka; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.state.KeyValueStore; import java.util.Arrays; import java.util.Locale; import java.util.Properties; import java.util.concurrent.CountDownLatch; /** * 单词统计 */ public class WordCount { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.49.196.10:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("streams-plaintext-input"); source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\W+")))//把每条消息拆成一个个单词 .groupBy((key, value) -> value)//根据单词分组 .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))//计算每个单词的个数并保存在名为"counts-store"的KeyValueStore中 .toStream() .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));//设置输出类型,键为String,值为long final Topology topology = builder.build(); System.out.println(topology.describe()); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); streams.start(); latch.await(); } }
程序从streams-plaintext-input中读取消息,并把每条消息拆成单词,并统计这些单词的数量;把统计信息发送到streams-wordcount-output,可用如下命令查看主题输出情况:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --value-deserializer org.apache.kafka.common.serialization.LongDeserializer --property print.key=true
输入如下:
输出如下:
5、Admin
package com.inspur.demo.general.kafka; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartitionReplica; import org.apache.kafka.common.config.ConfigResource; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Properties; public class AdminCase { private AdminClient adminClient; @Before public void before() { Properties props = new Properties(); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "10.49.196.10:9092"); adminClient = AdminClient.create(props); } @After public void after() { adminClient.close(); } /** * 创建主题 */ @Test public void createTopics() { NewTopic topic = new NewTopic("admin-test", 4, (short) 1);//分区为4,副本为1 Collection<NewTopic> topicList = new ArrayList<>(); topicList.add(topic); adminClient.createTopics(topicList); } /** * 列出主题 * @throws Exception */ @Test public void listTopics() throws Exception { ListTopicsOptions listTopicsOptions = new ListTopicsOptions(); listTopicsOptions.listInternal(true);//是否罗列内部主题 ListTopicsResult result = adminClient.listTopics(listTopicsOptions); Collection<TopicListing> list = result.listings().get(); System.out.println(list); } /** * 查看主题详情 * @throws Exception */ @Test public void describeTopics() throws Exception { DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList("admin-test")); System.out.println(result.all().get()); } /** * 删除主题 * @throws Exception */ @Test public void deleteTopics() throws Exception { adminClient.deleteTopics(Arrays.asList("admin-test")); } /** * 查询集群信息 * @throws Exception */ @Test public void describeCluster() throws Exception { DescribeClusterResult result = adminClient.describeCluster(); System.out.println(result.nodes().get()); } /** * 查询配置信息 * @throws Exception */ @Test public void describeConfigs() throws Exception { DescribeConfigsResult result = adminClient.describeConfigs(Arrays.asList(new ConfigResource(ConfigResource.Type.TOPIC, "admin-test"))); System.out.println(result.all().get()); } /** * 查询节点的日志目录信息 * @throws Exception */ @Test public void describeLogDirs() throws Exception { DescribeLogDirsResult result = adminClient.describeLogDirs(Arrays.asList(0));//查询broker.id为0的节点 System.out.println(result.all().get()); } /** * 查询副本的日志目录信息 * @throws Exception */ @Test public void describeReplicaLogDirs() throws Exception { DescribeReplicaLogDirsResult result = adminClient.describeReplicaLogDirs(Arrays.asList(new TopicPartitionReplica("admin-test", 0, 0))); System.out.println(result.all().get()); } }