一、Kafka Stream介绍
Kafka Stream是处理分析存储在Kafka数据的客户端程序库
Kafka Stream通过state store可以实现高效状态操作。
支持原语Processor和高层抽象DSL
二、Kafka高层架构图
三、Kafka Stream关键词
流及流处理器
流处理拓扑
源处理器及Sink处理器
四、创建Topic
使用Kafka API创建两个tipic,分别为
larry-stream-in,
larry-stream-out
五、定义规则
public class WordCountDefineRule { public static final String INPUT_TOPIC = "larry-stream-in"; public static final String OUT_TOPIC = "larry-stream-out"; public static void main(String[] args) throws Exception { defineStreamRule(); } // 定义流规则 private static void defineStreamRule(){ Properties properties = new Properties(); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"118.xx.xx.101:9092"); properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount-app"); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); JaasUtil.setProperties(properties); // 构建流结构拓扑 StreamsBuilder builder = new StreamsBuilder(); // 构建Wordcount process wordCountStream(builder); Topology topology = builder.build(); System.out.println(topology.describe()); KafkaStreams streams = new KafkaStreams(topology, properties); // 控制运行次数,一次后就结束 final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook"){ @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); }catch (Throwable e){ System.exit(1); } System.exit(0); } static void wordCountStream(final StreamsBuilder builder){ KStream<String,String> source = builder.stream(INPUT_TOPIC); final KTable<String,Long> count = source.flatMapValues( value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\W+"))) .selectKey((key,word) -> word) .groupByKey() // .groupBy((key,value) -> value) .count(Materialized.as("counts-store")); count.toStream().to(OUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long())); } }
六、生产者
public class WordCountProducer { public static void main(String[] args) throws Exception { produce(); } private static void produce() throws Exception { // create instance for properties to access producer configs Properties props = new Properties(); // Assign localhost id, 参考http://kafka.apache.org/documentation/#producerapi props.put("bootstrap.servers", "118.xx.xx.101:9092"); // Set acknowledgements for producer requests. props.put("acks", "all"); // If the request fails, the producer can automatically retry, props.put("retries", 0); // Specify buffer size in config props.put("batch.size", 16384); // Reduce the no of requests less than 0 props.put("linger.ms", 1); // The buffer.memory controls the total amount of memory available to the // producer for buffering. props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); JaasUtil.setProperties(props); Producer<String, String> producer = new KafkaProducer<String, String>(props); // read a txt file , send it line by line File file = new File("E:\files\kafka\wordCount.txt"); BufferedReader reader = new BufferedReader(new FileReader(file)); String tempString = null; while ((tempString = reader.readLine()) != null) { producer.send(new ProducerRecord<String, String>(WordCountDefineRule.INPUT_TOPIC, tempString)); Thread.sleep(1000); } reader.close(); /* * for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i))); } */ System.out.println("Message sent successfully"); producer.close(); } }
wordCount.txt文档的内容
Hello Word Hello Tom Hello Nice Tom Nice Word
七、消费者
public class WordCountConsumer { public static void main(String[] args) { // Kafka consumer configuration settings Properties props = new Properties(); props.put("bootstrap.servers", "118.xx.xx.101:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer"); JaasUtil.setProperties(props); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props); // Kafka Consumer subscribes list of topics here. kafkaConsumer.subscribe(Arrays.asList(WordCountDefineRule.OUT_TOPIC)); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { // print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = %s ", record.offset(), record.key(), record.value()); } } } }
输出统计结果