zoukankan      html  css  js  c++  java
  • Kafka Streams开发入门(10)

    1. 背景

    上一篇介绍了Kafka Streams的时间窗口以及Tumbling Window的实例。这一篇我们利用Kafka Streams中的KTable概念来实时计算一组电影的平均分数。

    2. 功能演示说明

    这篇文章中我们会创建一个Kafka topic来表示电影打分事件,然后我们编写一个程序实时统计当前电影的平均分数。我们依然使用ProtocolBuffer对消息事件进行序列化。事件的JSON格式如下所示:

    {"movie_id": 362, "rating": 9.6}
    {"movie_id": 362, "rating": 9.7}
    {"movie_id": 362, "rating": 8.6}
    

    当Kafka Streams程序依次处理这3条事件时,它将依次产生以下输出:

    > 9.6
    > 9.65
    > 9.3
    

    3. 配置项目

    第1步是创建项目功能所在路径,命令如下:

    $ mkdir aggregating-average
    $ cd aggregating-average
    

    然后在新创建的aggregating-average路径下新建Gradle配置文件build.gradle,内容如下:

    buildscript {
      repositories {
        jcenter()
      }
      dependencies {
        classpath "com.github.jengelman.gradle.plugins:shadow:4.0.2"
      }
    }
    plugins {
      id "java"
      id "com.google.protobuf" version "0.8.12"
    }
    apply plugin: 'com.github.johnrengelman.shadow'
    sourceCompatibility = "1.8"
    targetCompatibility = "1.8"
    version = "0.0.1"
    repositories {
      mavenCentral()
      jcenter()
      maven { url 'https://packages.confluent.io/maven/' }
    }
    group 'huxihx.kafkastreams'
    dependencies {
      implementation 'com.google.protobuf:protobuf-java:3.12.4'
      implementation 'org.slf4j:slf4j-simple:1.7.30'
      implementation 'org.apache.kafka:kafka-streams:2.5.0'
      implementation "com.typesafe:config:1.4.0"
    
      testCompile group: 'junit', name: 'junit', version: '4.13'
    }
    
    protobuf {
      generatedFilesBaseDir = "$projectDir/src/"
      protoc {
        artifact = 'com.google.protobuf:protoc:3.12.4'
      }
    }
    jar {
      manifest {
        attributes(
            "Class-Path": configurations.runtime.collect { it.getName() }.join(" "),
            "Main-Class": 'huxihx.kafkastreams.RunningAverage' 
        )
      }
    }
    shadowJar {
      archiveFileName = "aggregating-average-standalone-$version.$extension"
    }  

    我们指定app主类是huxihx.kafkastreams.RunningAverage。之后,保存上面的文件,然后执行下列命令下载Gradle的wrapper套件:

    $ gradle wrapper
    

    做完这些之后,我们在aggregating-average目录下创建名为configuration的子目录,用于保存我们的参数配置文件dev.properties:

    $ mkdir configuration
    $ cd configuration
    $ vi dev.properties
    

    dev.properties文件内容如下:  

    application.id=kafka-films
    request.timeout.ms=20000
    bootstrap.servers=localhost:9092
    retry.backoff.ms=500
    default.topic.replication.factor=1
    offset.reset.policy=latest
    input.ratings.topic.name=ratings
    input.ratings.topic.partitions=1
    input.ratings.topic.replication.factor=1
    output.rating-averages.topic.name=rating-averages
    output.rating-averages.topic.partitions=1
    output.rating-averages.topic.replication.factor=1

    这里我们创建了一个输入topic:ratings和一个输出topic:rating-averages。前者表示电影打分事件,后者保存电影的平均分数。

    4. 创建消息Schema

    由于我们使用ProtocolBuffer进行序列化,因此我们要提前生成好Java类来建模实体消息。我们在aggregating-average路径下执行以下命令创建保存schema的文件夹:

    $ mkdir -p src/main/proto
    $ cd src/main/proto
    

    之后在proto文件夹下创建名为rating.proto文件,内容如下:

    syntax = "proto3";
       
    package huxihx.kafkastreams.proto;
       
    message Rating {
        int64 movie_id = 1;
        double rating = 2;
    }
    

    之后创建countsum.proto文件保存计算平均数所需的count和sum信息:

    syntax = "proto3";
       
    package huxihx.kafkastreams.proto;
       
    message CountAndSum {
        int64 count = 1;
        double sum = 2;
    }
    

    保存上面的文件之后在aggregating-average目录下运行gradlew命令:

    $ ./gradlew build
    

    此时,你应该可以在aggregating-average的src/main/java/huxihx/kafkastreams/proto下看到生成的两个Java类:RatingOuterClass和Countsum。

    5. 创建Serdes

    这一步我们为所需的topic消息创建Serdes。首先在aggregating-average目录下执行下面的命令创建对应的文件夹目录:  

    $ mkdir -p src/main/java/huxihx/kafkastreams/serdes
    

    在新创建的serdes文件夹下创建ProtobufSerializer.java,内容如下:

    package huxihx.kafkastreams.serdes;
        
    import com.google.protobuf.MessageLite;
    import org.apache.kafka.common.serialization.Serializer;
        
    public class ProtobufSerializer<T extends MessageLite> implements Serializer<T> {
        @Override
        public byte[] serialize(String topic, T data) {
            return data == null ? new byte[0] : data.toByteArray();
        }
    }
    

    接下来是创建ProtobufDeserializer.java:

    package huxihx.kafkastreams.serdes;
        
    import com.google.protobuf.InvalidProtocolBufferException;
    import com.google.protobuf.MessageLite;
    import com.google.protobuf.Parser;
    import org.apache.kafka.common.errors.SerializationException;
    import org.apache.kafka.common.serialization.Deserializer;
        
    import java.util.Map;
        
    public class ProtobufDeserializer<T extends MessageLite> implements Deserializer<T> {
        
        private Parser<T> parser;
        
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
            parser = (Parser<T>) configs.get("parser");
        }
        
        @Override
        public T deserialize(String topic, byte[] data) {
            try {
                return parser.parseFrom(data);
            } catch (InvalidProtocolBufferException e) {
                throw new SerializationException("Failed to deserialize from a protobuf byte array.", e);
            }
        }
    }
    

    最后是ProtobufSerdes.java:

    package huxihx.kafkastreams.serdes;
        
    import com.google.protobuf.MessageLite;
    import com.google.protobuf.Parser;
    import org.apache.kafka.common.serialization.Deserializer;
    import org.apache.kafka.common.serialization.Serde;
    import org.apache.kafka.common.serialization.Serializer;
        
    import java.util.HashMap;
    import java.util.Map;
        
    public class ProtobufSerdes<T extends MessageLite> implements Serde<T> {
        
        private final Serializer<T> serializer;
        private final Deserializer<T> deserializer;
        
        public ProtobufSerdes(Parser<T> parser) {
            serializer = new ProtobufSerializer<>();
            deserializer = new ProtobufDeserializer<>();
            Map<String, Parser<T>> config = new HashMap<>();
            config.put("parser", parser);
            deserializer.configure(config, false);
        }
        
        @Override
        public Serializer<T> serializer() {
            return serializer;
        }
        
        @Override
        public Deserializer<T> deserializer() {
            return deserializer;
        }
    }
    

    6. 开发主流程

    创建RunningAverage.java来执行平均分输的计算。注意代码中的getRatingAverageTable方法是如何计算平均分数的。

    package huxihx.kafkastreams;
    
    import com.typesafe.config.Config;
    import com.typesafe.config.ConfigFactory;
    import huxihx.kafkastreams.proto.Countsum;
    import huxihx.kafkastreams.proto.RatingOuterClass;
    import huxihx.kafkastreams.serdes.ProtobufSerdes;
    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.AdminClientConfig;
    import org.apache.kafka.clients.admin.NewTopic;
    import org.apache.kafka.clients.admin.TopicListing;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.KeyValue;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    import org.apache.kafka.streams.kstream.Consumed;
    import org.apache.kafka.streams.kstream.Grouped;
    import org.apache.kafka.streams.kstream.KGroupedStream;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.KTable;
    import org.apache.kafka.streams.kstream.Materialized;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;
    import java.util.stream.Collectors;
    import java.util.stream.Stream;
    
    public class RunningAverage {
    
        private static ProtobufSerdes<RatingOuterClass.Rating> ratingSerdes() {
            return new ProtobufSerdes<>(RatingOuterClass.Rating.parser());
        }
    
        private static ProtobufSerdes<Countsum.CountAndSum> countAndSumSerdes() {
            return new ProtobufSerdes<>(Countsum.CountAndSum.parser());
        }
    
        public static void main(String[] args) throws Exception {
            new RunningAverage().runRecipe();
        }
    
        private Properties loadEnvProperties() {
            final Config load = ConfigFactory.load();
            final Map<String, Object> map = load.entrySet().stream()
                    .filter(entry -> Stream.of("java", "user", "sun", "os", "http", "ftp", "file", "line", "awt", "gopher", "socks", "path")
                            .noneMatch(s -> entry.getKey().startsWith(s)))
                    .peek(filteredEntry -> System.out.println(filteredEntry.getKey() + ": " + filteredEntry.getValue().unwrapped()))
                    .collect(Collectors.toMap(Map.Entry::getKey, y -> y.getValue().unwrapped()));
    
            Properties props = new Properties();
            props.putAll(map);
            return props;
        }
    
        private void runRecipe() throws Exception {
            Properties envProps = this.loadEnvProperties();
            Properties streamProps = this.createStreamsProperties(envProps);
            Topology topology = this.buildTopology(new StreamsBuilder(), envProps);
            this.preCreateTopics(envProps);
    
            final KafkaStreams streams = new KafkaStreams(topology, streamProps);
            final CountDownLatch latch = new CountDownLatch(1);
    
            Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
                @Override
                public void run() {
                    streams.close(Duration.ofSeconds(5));
                    latch.countDown();
                }
            });
    
            try {
                streams.cleanUp();
                streams.start();
                latch.await();
            } catch (Throwable e) {
                System.exit(1);
            }
            System.exit(0);
        }
    
        private static KTable<Long, Double> getRatingAverageTable(KStream<Long, RatingOuterClass.Rating> ratings,
                                                                  String avgRatingsTopicName,
                                                                  ProtobufSerdes<Countsum.CountAndSum> countAndSumSerdes) {
            KGroupedStream<Long, Double> ratingsById = ratings
                    .map((key, rating) -> new KeyValue<>(rating.getMovieId(), rating.getRating()))
                    .groupByKey(Grouped.with(Serdes.Long(), Serdes.Double()));
            final KTable<Long, Countsum.CountAndSum> ratingCountAndSum =
                    ratingsById.aggregate(() -> Countsum.CountAndSum.newBuilder().setCount(0L).setSum(0.0D).build(),
                            (key, value, aggregate) -> Countsum.CountAndSum.newBuilder().setCount(aggregate.getCount() + 1).setSum(aggregate.getSum() + value).build(),
                            Materialized.with(Serdes.Long(), countAndSumSerdes));
            final KTable<Long, Double> ratingAverage =
                    ratingCountAndSum.mapValues(value -> value.getSum() / value.getCount(), Materialized.as("average-ratings"));
            ratingAverage.toStream().to(avgRatingsTopicName);
            return ratingAverage;
        }
    
        private Topology buildTopology(StreamsBuilder builder, Properties envProps) {
            final String ratingTopicName = envProps.getProperty("input.ratings.topic.name");
            final String avgRatingsTopicName = envProps.getProperty("output.rating-averages.topic.name");
            KStream<Long, RatingOuterClass.Rating> ratingStream = builder.stream(ratingTopicName,
                    Consumed.with(Serdes.Long(), ratingSerdes()));
            getRatingAverageTable(ratingStream, avgRatingsTopicName, countAndSumSerdes());
    
            return builder.build();
        }
    
    
        private static void preCreateTopics(Properties envProps) throws Exception {
            Map<String, Object> config = new HashMap<>();
            config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers"));
            String inputTopic = envProps.getProperty("input.ratings.topic.name");
            String outputTopic = envProps.getProperty("output.rating-averages.topic.name");
            try (AdminClient client = AdminClient.create(config)) {
                Collection<TopicListing> existingTopics = client.listTopics().listings().get();
    
                List<NewTopic> topics = new ArrayList<>();
                List<String> topicNames = existingTopics.stream().map(TopicListing::name).collect(Collectors.toList());
                if (!topicNames.contains(inputTopic))
                    topics.add(new NewTopic(
                            inputTopic,
                            Integer.parseInt(envProps.getProperty("input.ratings.topic.partitions")),
                            Short.parseShort(envProps.getProperty("input.ratings.topic.replication.factor"))));
    
                if (!topicNames.contains(outputTopic))
                    topics.add(new NewTopic(
                            outputTopic,
                            Integer.parseInt(envProps.getProperty("output.rating-averages.topic.partitions")),
                            Short.parseShort(envProps.getProperty("output.rating-averages.topic.replication.factor"))));
    
                if (!topics.isEmpty())
                    client.createTopics(topics).all().get();
            }
        }
    
        private Properties createStreamsProperties(Properties envProps) {
            Properties props = new Properties();
            props.putAll(envProps);
    
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, envProps.getProperty("application.id"));
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers"));
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Double().getClass());
            props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    
            props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, envProps.getProperty("default.topic.replication.factor"));
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, envProps.getProperty("offset.reset.policy"));
    
            return props;
        }
    }
    

    7. 编写测试Producer

    现在创建src/main/java/huxihx/kafkastreams/tests/TestProducer.java,代码如下:  

    package huxihx.kafkastreams.tests;
    
    import huxihx.kafkastreams.proto.RatingOuterClass;
    import huxihx.kafkastreams.serdes.ProtobufSerializer;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    public class TestProducer {
    
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ProducerConfig.ACKS_CONFIG, "all");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, new ProtobufSerializer<RatingOuterClass.Rating>().getClass());
    
            try (final Producer<String, RatingOuterClass.Rating> producer = new KafkaProducer<>(props)) {
                ProducerRecord<String, RatingOuterClass.Rating> event =
                        new ProducerRecord<>("ratings", RatingOuterClass.Rating.newBuilder().setMovieId(362).setRating(Double.valueOf(args[0])).build());
                producer.send(event, ((metadata, exception) -> {
                    if (exception != null) {
                        exception.printStackTrace();
                    }
                }));
            }
        }
    } 

    这个测试Producer通过命令行参数的方式指定电影的分数。

    8. 测试

    首先我们运行下列命令构建项目:

    $ ./gradlew shadowJar
    

    然后启动Kafka集群,之后运行Kafka Streams应用:

    $ java -Dconfig.file=configuration/dev.properties -jar build/libs/aggregating-average-standalone-0.0.1.jar
    

    现在启动一个终端打开console consumer:

    $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --group test-group --topic rating-averages --value-deserializer org.apache.kafka.common.serialization.DoubleDeserializer  

    由于平均分数使用Double类型表示,因此console consumer必须指定消息体的deserializer为DoubleDeserializer。

    之后在aggregating-average路径下打开终端,多次运行TestProducer生成电影分数:

    $ java -cp build/libs/aggregating-average-standalone-0.0.1.jar huxihx.kafkastreams.tests.TestProducer 9.6
    $ java -cp build/libs/aggregating-average-standalone-0.0.1.jar huxihx.kafkastreams.tests.TestProducer 9.7
    $ java -cp build/libs/aggregating-average-standalone-0.0.1.jar huxihx.kafkastreams.tests.TestProducer 8.6
    

    此时,回到console consumer的终端,你应该可以看到下面的输出:

    9.6
    9.65
    9.3  

    这表明,Kafka Streams app能够正确地实时计算电影的平均分数。 

  • 相关阅读:
    JavaScript是如何工作的:深入类和继承内部原理 + Babel和TypeScript 之间转换
    JavaScript的工作原理:解析、抽象语法树(AST)+ 提升编译速度5个技巧
    如何保证MongoDB的安全性?
    SQLSERVER 中sp_who, sp_who2和sp_who3(转载)
    exec sp_spaceused如何只返回一个结果集(转载)
    c#静态构造函数 与 构造函数 你是否还记得?(转载)
    如何让.NET Core支持GB2312和GBK
    C#中用HttpWebRequest中发送GET/HTTP/HTTPS请求 (转载)
    ASP.NET Core读取AppSettings (转载)
    使用自定义端口连接SQL Server的方法(转载)
  • 原文地址:https://www.cnblogs.com/huxi2b/p/13434600.html
Copyright © 2011-2022 走看看