zoukankan      html  css  js  c++  java
  • Kafka Streams开发入门(8)

    1. 背景

    上一篇介绍了如何利用Kafka Streams对实时消息流进行汇总求和。本篇将设定一个场景:我们引入一个Kafka topic表示电影票的销售,同时我们基于Kafka Streams编写一个程序来计算每年最卖座的电影票房以及最不卖座的电影票房。

    2. 功能演示说明

    假设我们要执行票房统计的topic消息格式如下:

    {"title":"Avengers: Endgame","release_year":2019,"total_sales":856980506}

    消息是JSON格式,title是电影名,release_year是电影发行年份,total_sales表示该电影的总票房。现在我们要根据这些消息来统计每年最卖座的电影和最不卖座的电影以及它们对应的票房。

    3. 配置项目

    首先创建项目路径:

    $ mkdir aggregate-minmax
    $ cd aggregate-minmax

    之后在aggregate-minmax目录下创建Gradle配置文件build.gradle,内容如下:  

    
    

    buildscript {

    repositories {
    jcenter()
    }
    dependencies {
    classpath 'com.github.jengelman.gradle.plugins:shadow:4.0.2'
    }
    }

    plugins {
    id 'java'
    id "com.google.protobuf" version "0.8.10"
    }
    apply plugin: 'com.github.johnrengelman.shadow'


    repositories {
    mavenCentral()
    jcenter()

    maven {
    url 'http://packages.confluent.io/maven'
    }
    }

    group 'huxihx.kafkastreams'

    sourceCompatibility = 1.8
    targetCompatibility = '1.8'
    version = '0.0.1'

    dependencies {
    implementation 'com.google.protobuf:protobuf-java:3.11.4'
    implementation 'org.slf4j:slf4j-simple:1.7.26'
    implementation 'org.apache.kafka:kafka-streams:2.4.0'

    testCompile group: 'junit', name: 'junit', version: '4.12'
    }

    protobuf {
    generatedFilesBaseDir = "$projectDir/src/"
    protoc {
    artifact = 'com.google.protobuf:protoc:3.11.4'
    }
    }

    jar {
    manifest {
    attributes(
    'Class-Path': configurations.compile.collect { it.getName() }.join(' '),
    'Main-Class': 'huxihx.kafkastreams.AggregatingMinMax'
    )
    }
    }

    shadowJar {
    archiveName = "kstreams-aggregating-minmax-standalone-${version}.${extension}"
    }

    注意我们设定的主类名称是huxihx.kafkastreams.AggregatingMinMax。

    保存上面的文件,然后执行下列命令下载Gradle的wrapper套件:

    $ gradle wrapper
    
    BUILD SUCCESSFUL in 2s

    做完这些之后,我们在aggregate-minmax目录下创建名为configuration的子目录,用于保存我们的参数配置文件dev.properties:

    $ mkdir configuration
    $ cd configuration
    $ vi dev.properties

    application.id=aggregating-minmax-app
    bootstrap.servers=localhost:9092

    
    

    input.topic.name=movie-ticket-sales
    input.topic.partitions=1
    input.topic.replication.factor=1

    
    

    output.topic.name=movie-figures-by-year
    output.topic.partitions=1
    output.topic.replication.factor=1

    这里我们配置了一个输入topic和一个输出topic,分别保存输入消息流和每年的最卖座电影票房与最不卖座电影票房。

     4. 创建消息Schema

    接下来创建输入topic的schema。在aggregate-minmax下执行命令创建保存schema的文件夹:

    $ mkdir -p src/main/proto

    之后在proto文件夹下创建名为movie-ticket-sales.proto文件,内容如下:

    syntax = "proto3";
       
    package huxihx.kafkastreams.proto;
       
    message MovieTicketSales {
        string title = 1;
        int32 release_year = 2;
        int32 total_sales = 3;
    }

    下面创建输出topic创建schema。在proto文件夹下创建名为yearly-movie-figures.proto文件,内容如下:

    syntax = "proto3";
       
    package huxihx.kafkastreams.proto;
       
    message YearlyMovieFigures {
        int32 release_year = 1;
        int32 min_total_sales = 2;
        int32 max_total_sales = 3;
    }

    保存之后在aggregate-minmax目录下运行gradlew命令:

    ./gradlew build

    此时,你应该可以在aggregate-minmax/src/main/java/huxihx/kafkastreams/proto下看到生成的Java类:MovieTicketSalesOuterClass和YearlyMovieFiguresOuterClass。

    5. 创建Serdes

    这一步我们为所需的topic消息创建各自的Serdes。首先在aggregate-minmax目录下执行下面的命令创建对应的文件夹目录:

    $ mkdir -p src/main/java/huxihx/kafkastreams/serdes

    之后在新创建的serdes文件夹下创建ProtobufSerializer.java:

    package huxihx.kafkastreams.serdes;
        
    import com.google.protobuf.MessageLite;
    import org.apache.kafka.common.serialization.Serializer;
        
    public class ProtobufSerializer<T extends MessageLite> implements Serializer<T> {
        @Override
        public byte[] serialize(String topic, T data) {
            return data == null ? new byte[0] : data.toByteArray();
        }
    }

    然后是ProtobufDeserializer.java:

    package huxihx.kafkastreams.serdes;
        
    import com.google.protobuf.InvalidProtocolBufferException;
    import com.google.protobuf.MessageLite;
    import com.google.protobuf.Parser;
    import org.apache.kafka.common.errors.SerializationException;
    import org.apache.kafka.common.serialization.Deserializer;
        
    import java.util.Map;
        
    public class ProtobufDeserializer<T extends MessageLite> implements Deserializer<T> {
        
        private Parser<T> parser;
        
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
            parser = (Parser<T>) configs.get("parser");
        }
        
        @Override
        public T deserialize(String topic, byte[] data) {
            try {
                return parser.parseFrom(data);
            } catch (InvalidProtocolBufferException e) {
                throw new SerializationException("Failed to deserialize from a protobuf byte array.", e);
            }
        }
    }

    最后是ProtobufSerdes.java:

    package huxihx.kafkastreams.serdes;
        
    import com.google.protobuf.MessageLite;
    import com.google.protobuf.Parser;
    import org.apache.kafka.common.serialization.Deserializer;
    import org.apache.kafka.common.serialization.Serde;
    import org.apache.kafka.common.serialization.Serializer;
        
    import java.util.HashMap;
    import java.util.Map;
        
    public class ProtobufSerdes<T extends MessageLite> implements Serde<T> {
        
        private final Serializer<T> serializer;
        private final Deserializer<T> deserializer;
        
        public ProtobufSerdes(Parser<T> parser) {
            serializer = new ProtobufSerializer<>();
            deserializer = new ProtobufDeserializer<>();
            Map<String, Parser<T>> config = new HashMap<>();
            config.put("parser", parser);
            deserializer.configure(config, false);
        }
        
        @Override
        public Serializer<T> serializer() {
            return serializer;
        }
        
        @Override
        public Deserializer<T> deserializer() {
            return deserializer;
        }
    }

    6. 开发主流程

    首先在src/main/java/huxihx/kafkastreams下创建AggregatingMinMax.java,代码如下:

    package huxihx.kafkastreams;
    
    import huxihx.kafkastreams.proto.MovieTicketSalesOuterClass;
    import huxihx.kafkastreams.proto.YearlyMovieFiguresOuterClass;
    import huxihx.kafkastreams.serdes.ProtobufSerdes;
    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.AdminClientConfig;
    import org.apache.kafka.clients.admin.NewTopic;
    import org.apache.kafka.clients.admin.TopicListing;
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    import org.apache.kafka.streams.kstream.Consumed;
    import org.apache.kafka.streams.kstream.Grouped;
    import org.apache.kafka.streams.kstream.Materialized;
    import org.apache.kafka.streams.kstream.Produced;
    
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;
    import java.util.stream.Collectors;
    
    public class AggregatingMinMax {
    
        public static void main(String[] args) throws Exception {
            if (args.length < 1) {
                throw new IllegalArgumentException("This program takes one argument: the path to an environnment configuration file.");
            }
    
            new AggregatingMinMax().runRecipe(args[0]);
        }
    
        private Properties loadEnvProperties(String fileName) throws IOException {
            Properties envProps = new Properties();
            try (FileInputStream input = new FileInputStream(fileName)) {
                envProps.load(input);
            }
            return envProps;
        }
    
        private void runRecipe(final String configPath) throws Exception {
            Properties envProps = this.loadEnvProperties(configPath);
            Properties streamProps = this.createStreamsProperties(envProps);
    
            Topology topology = this.buildTopology(envProps, movieTicketSalesProtobufSerdes(), yearlyMovieFiguresProtobufSerdes());
            this.preCreateTopics(envProps);
    
            final KafkaStreams streams = new KafkaStreams(topology, streamProps);
            final CountDownLatch latch = new CountDownLatch(1);
    
            // Attach shutdown handler to catch Control-C.
            Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });
    
            try {
                streams.start();
                latch.await();
            } catch (Throwable e) {
                System.exit(1);
            }
            System.exit(0);
    
        }
    
        private static ProtobufSerdes<MovieTicketSalesOuterClass.MovieTicketSales> movieTicketSalesProtobufSerdes() {
            return new ProtobufSerdes<>(MovieTicketSalesOuterClass.MovieTicketSales.parser());
        }
    
        private static ProtobufSerdes<YearlyMovieFiguresOuterClass.YearlyMovieFigures> yearlyMovieFiguresProtobufSerdes() {
            return new ProtobufSerdes<>(YearlyMovieFiguresOuterClass.YearlyMovieFigures.parser());
        }
    
        private Topology buildTopology(final Properties envProps,
                                       final ProtobufSerdes<MovieTicketSalesOuterClass.MovieTicketSales> movieTicketSalesProtobufSerdes,
                                       final ProtobufSerdes<YearlyMovieFiguresOuterClass.YearlyMovieFigures> yearlyMovieFiguresProtobufSerdes) {
            final StreamsBuilder builder = new StreamsBuilder();
            final String inputTopic = envProps.getProperty("input.topic.name");
            final String outputTopic = envProps.getProperty("output.topic.name");
    
            builder.stream(inputTopic, Consumed.with(Serdes.String(), movieTicketSalesProtobufSerdes))
                    .groupBy(
                            (k, v) -> v.getReleaseYear(),
                            Grouped.with(Serdes.Integer(), movieTicketSalesProtobufSerdes))
                    .aggregate(
                            () -> YearlyMovieFiguresOuterClass.YearlyMovieFigures.newBuilder()
                                    .setReleaseYear(0).setMaxTotalSales(Integer.MIN_VALUE).setMinTotalSales(Integer.MAX_VALUE).build(),
                            ((key, value, aggregate) ->
                                    YearlyMovieFiguresOuterClass.YearlyMovieFigures.newBuilder().setReleaseYear(key)
                                            .setMinTotalSales(Math.min(value.getTotalSales(), aggregate.getMinTotalSales()))
                                            .setMaxTotalSales(Math.max(value.getTotalSales(), aggregate.getMaxTotalSales()))
                                            .build()),
                            Materialized.with(Serdes.Integer(), yearlyMovieFiguresProtobufSerdes))
                    .toStream()
                    .to(outputTopic, Produced.with(Serdes.Integer(), yearlyMovieFiguresProtobufSerdes));
    
            return builder.build();
        }
    
    
        private static void preCreateTopics(Properties envProps) throws Exception {
            Map<String, Object> config = new HashMap<>();
            config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers"));
            String inputTopic = envProps.getProperty("input.topic.name");
            String outputTopic = envProps.getProperty("output.topic.name");
            try (AdminClient client = AdminClient.create(config)) {
                Collection<TopicListing> existingTopics = client.listTopics().listings().get();
    
                List<NewTopic> topics = new ArrayList<>();
                List<String> topicNames = existingTopics.stream().map(TopicListing::name).collect(Collectors.toList());
                if (!topicNames.contains(inputTopic))
                    topics.add(new NewTopic(
                            inputTopic,
                            Integer.parseInt(envProps.getProperty("input.topic.partitions")),
                            Short.parseShort(envProps.getProperty("input.topic.replication.factor"))));
    
                if (!topicNames.contains(outputTopic))
                    topics.add(new NewTopic(
                            outputTopic,
                            Integer.parseInt(envProps.getProperty("output.topic.partitions")),
                            Short.parseShort(envProps.getProperty("output.topic.replication.factor"))));
    
                if (!topics.isEmpty())
                    client.createTopics(topics).all().get();
            }
        }
    
        private Properties createStreamsProperties(Properties envProps) {
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, envProps.getProperty("application.id"));
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers"));
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
            return props;
        }
    }

    7. 编写测试Producer和Consumer

    和之前的入门系列一样,我们编写TestProducer类,位置在src/main/java/huxihx/kafkastreams/tests/TestProducer.java,内容如下:

    package huxihx.kafkastreams.tests;
    
    import huxihx.kafkastreams.proto.MovieTicketSalesOuterClass;
    import huxihx.kafkastreams.serdes.ProtobufSerializer;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    
    public class TestProducer {
        private static final List<MovieTicketSalesOuterClass.MovieTicketSales> TEST_EVENTS = Arrays.asList(
                MovieTicketSalesOuterClass.MovieTicketSales.newBuilder()
                        .setReleaseYear(2019).setTitle("Avengers: Endgame").setTotalSales(856980506).build(),
                MovieTicketSalesOuterClass.MovieTicketSales.newBuilder()
                        .setReleaseYear(2019).setTitle("Captain Marvel").setTotalSales(426829839).build(),
                MovieTicketSalesOuterClass.MovieTicketSales.newBuilder()
                        .setReleaseYear(2019).setTitle("Toy Story 4").setTotalSales(401486230).build(),
                MovieTicketSalesOuterClass.MovieTicketSales.newBuilder()
                        .setReleaseYear(2019).setTitle("The Lion King").setTotalSales(385082142).build(),
                MovieTicketSalesOuterClass.MovieTicketSales.newBuilder()
                        .setReleaseYear(2018).setTitle("Black Panther").setTotalSales(700059566).build(),
                MovieTicketSalesOuterClass.MovieTicketSales.newBuilder()
                        .setReleaseYear(2018).setTitle("Avengers: Infinity War").setTotalSales(678815482).build(),
                MovieTicketSalesOuterClass.MovieTicketSales.newBuilder()
                        .setReleaseYear(2018).setTitle("Deadpool 2").setTotalSales(324512774).build(),
                MovieTicketSalesOuterClass.MovieTicketSales.newBuilder()
                        .setReleaseYear(2017).setTitle("Beauty and the Beast").setTotalSales(517218368).build(),
                MovieTicketSalesOuterClass.MovieTicketSales.newBuilder()
                        .setReleaseYear(2017).setTitle("Wonder Woman").setTotalSales(412563408).build(),
                MovieTicketSalesOuterClass.MovieTicketSales.newBuilder()
                        .setReleaseYear(2017).setTitle("Star Wars Ep. VIII: The Last Jedi").setTotalSales(517218368).build());
    
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ProducerConfig.ACKS_CONFIG, "all");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, new ProtobufSerializer<MovieTicketSalesOuterClass.MovieTicketSales>().getClass());
    
            try (final Producer<String, MovieTicketSalesOuterClass.MovieTicketSales> producer = new KafkaProducer<>(props)) {
                TEST_EVENTS.stream().map(event ->
                        new ProducerRecord<String, MovieTicketSalesOuterClass.MovieTicketSales>("movie-ticket-sales", event)).forEach(producer::send);
            }
        }
    
    }

    接下来是TestConsumer.java,代码如下:

    package huxihx.kafkastreams.tests;
    
    import com.google.protobuf.Parser;
    import huxihx.kafkastreams.proto.YearlyMovieFiguresOuterClass;
    import huxihx.kafkastreams.serdes.ProtobufDeserializer;
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.Deserializer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    public class TestConsumer {
        public static void main(String[] args) {
            Deserializer<YearlyMovieFiguresOuterClass.YearlyMovieFigures> deserializer = new ProtobufDeserializer<>();
            Map<String, Parser<YearlyMovieFiguresOuterClass.YearlyMovieFigures>> config = new HashMap<>();
            config.put("parser", YearlyMovieFiguresOuterClass.YearlyMovieFigures.parser());
            deserializer.configure(config, false);
    
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group01");
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
            try (final Consumer<String, YearlyMovieFiguresOuterClass.YearlyMovieFigures> consumer =
                         new KafkaConsumer<>(props, new StringDeserializer(), deserializer)) {
                consumer.subscribe(Arrays.asList("movie-figures-by-year"));
                while (true) {
                    ConsumerRecords<String, YearlyMovieFiguresOuterClass.YearlyMovieFigures> records = consumer.poll(Duration.ofMillis(1000));
                    for (ConsumerRecord<String, YearlyMovieFiguresOuterClass.YearlyMovieFigures> record : records) {
                        System.out.printf("value = %s%n", record.value());
                    }
                }
            }
        }
    }

    8. 测试

    首先我们运行下列命令构建项目,在aggregate-minmax路径下执行下列命令:

    $ ./gradlew shadowJar

    然后启动Kafka集群,之后运行Kafka Streams应用:

    $ java -jar build/libs/kstreams-transform-standalone-0.0.1.jar configuration/dev.properties

    现在启动一个终端打开TestConsumer:

    $ java -cp build/libs/kstreams-transform-standalone-0.0.1.jar huxihx.kafkastreams.tests.TestConsumer

    再打开一个终端运行TestProducer:

    $ java -cp build/libs/kstreams-transform-standalone-0.0.1.jar huxihx.kafkastreams.tests.TestProducer

    如果一切正常,你应该可以看到下面的输出。每一条消息都实时统计了对应年份下的最卖座电影票房和最不卖座电影票房:

    value = release_year: 2019
    min_total_sales: 856980506
    max_total_sales: 856980506
    
    value = release_year: 2019
    min_total_sales: 426829839
    max_total_sales: 856980506
    
    value = release_year: 2019
    min_total_sales: 401486230
    max_total_sales: 856980506
    
    value = release_year: 2019
    min_total_sales: 385082142
    max_total_sales: 856980506
    
    value = release_year: 2018
    min_total_sales: 700059566
    max_total_sales: 700059566
    
    value = release_year: 2018
    min_total_sales: 678815482
    max_total_sales: 700059566
    
    value = release_year: 2018
    min_total_sales: 324512774
    max_total_sales: 700059566
    
    value = release_year: 2017
    min_total_sales: 517218368
    max_total_sales: 517218368
    
    value = release_year: 2017
    min_total_sales: 412563408
    max_total_sales: 517218368
    
    value = release_year: 2017
    min_total_sales: 412563408
    max_total_sales: 517218368
  • 相关阅读:
    Mac系统访问Windows共享文件的详细步骤
    登录名 '***' 拥有一个或多个数据库。在删除该登录名之前,请更改相应数据库的所有者。 (Microsoft SQL Server,错误: 15174)
    窗口中文乱码,永久解决方法
    Delphi 快捷键列表
    Delphi代码规范
    hpwin10重置系统
    记:lr请求响应中文乱码转码方法!
    VMware NAT和桥接
    记:grafana不展示仪表盘数据解决方法
    性能的几个常见指标
  • 原文地址:https://www.cnblogs.com/huxi2b/p/12596877.html
Copyright © 2011-2022 走看看