一、Kafka Stream介绍
Kafka Stream是处理分析存储在Kafka数据的客户端程序库
Kafka Stream通过state store可以实现高效状态操作。
支持原语Processor和高层抽象DSL
二、Kafka高层架构图

三、Kafka Stream关键词
流及流处理器
流处理拓扑
源处理器及Sink处理器
四、创建Topic
使用Kafka API创建两个tipic,分别为
larry-stream-in,
larry-stream-out
五、定义规则
public class WordCountDefineRule {
public static final String INPUT_TOPIC = "larry-stream-in";
public static final String OUT_TOPIC = "larry-stream-out";
public static void main(String[] args) throws Exception {
defineStreamRule();
}
// 定义流规则
private static void defineStreamRule(){
Properties properties = new Properties();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"118.xx.xx.101:9092");
properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount-app");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
JaasUtil.setProperties(properties);
// 构建流结构拓扑
StreamsBuilder builder = new StreamsBuilder();
// 构建Wordcount process
wordCountStream(builder);
Topology topology = builder.build();
System.out.println(topology.describe());
KafkaStreams streams = new KafkaStreams(topology, properties);
// 控制运行次数,一次后就结束
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook"){
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
}catch (Throwable e){
System.exit(1);
}
System.exit(0);
}
static void wordCountStream(final StreamsBuilder builder){
KStream<String,String> source = builder.stream(INPUT_TOPIC);
final KTable<String,Long> count = source.flatMapValues(
value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\W+")))
.selectKey((key,word) -> word)
.groupByKey()
// .groupBy((key,value) -> value)
.count(Materialized.as("counts-store"));
count.toStream().to(OUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
}
}
六、生产者
public class WordCountProducer {
public static void main(String[] args) throws Exception {
produce();
}
private static void produce() throws Exception {
// create instance for properties to access producer configs
Properties props = new Properties();
// Assign localhost id, 参考http://kafka.apache.org/documentation/#producerapi
props.put("bootstrap.servers", "118.xx.xx.101:9092");
// Set acknowledgements for producer requests.
props.put("acks", "all");
// If the request fails, the producer can automatically retry,
props.put("retries", 0);
// Specify buffer size in config
props.put("batch.size", 16384);
// Reduce the no of requests less than 0
props.put("linger.ms", 1);
// The buffer.memory controls the total amount of memory available to the
// producer for buffering.
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
JaasUtil.setProperties(props);
Producer<String, String> producer = new KafkaProducer<String, String>(props);
// read a txt file , send it line by line
File file = new File("E:\files\kafka\wordCount.txt");
BufferedReader reader = new BufferedReader(new FileReader(file));
String tempString = null;
while ((tempString = reader.readLine()) != null) {
producer.send(new ProducerRecord<String, String>(WordCountDefineRule.INPUT_TOPIC, tempString));
Thread.sleep(1000);
}
reader.close();
/*
* for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i))); }
*/
System.out.println("Message sent successfully");
producer.close();
}
}
wordCount.txt文档的内容
Hello Word Hello Tom Hello Nice Tom Nice Word
七、消费者
public class WordCountConsumer {
public static void main(String[] args) {
// Kafka consumer configuration settings
Properties props = new Properties();
props.put("bootstrap.servers", "118.xx.xx.101:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
JaasUtil.setProperties(props);
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
// Kafka Consumer subscribes list of topics here.
kafkaConsumer.subscribe(Arrays.asList(WordCountDefineRule.OUT_TOPIC));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s
", record.offset(), record.key(), record.value());
}
}
}
}
输出统计结果
