zoukankan      html  css  js  c++  java
  • Kafka Streams开发入门(7)

    1. 背景

    上一篇介绍了如何利用Kafka Streams找出并过滤掉实时流中那些重复的消息。本篇将介绍如何对消息中特定数据进行求和汇总。

    2. 功能演示说明

    假设我们要执行汇总求和的事件格式如下:

    {"title":"Die Hard","sale_ts":"2019-07-18T10:00:00Z","ticket_total_value":12}

    这条事件表示的是电影票的售出信息,其中ticket_total_value是票价。现在我们想要为每部电影实时统计各自的票房。

    3. 配置项目

    首先创建项目路径

    $ mkdir aggregate-sum
    $ cd aggregate-sum
    

    之后在aggregate-sum目录下创建Gradle配置文件build.gradle,内容如下:

    repositories {
        mavenCentral()
        jcenter()
    
        maven {
            url 'http://packages.confluent.io/maven'
        }
    }
    
    group 'huxihx.kafkastreams'
    
    sourceCompatibility = 1.8
    targetCompatibility = '1.8'
    version = '0.0.1'
    
    dependencies {
        implementation 'com.google.protobuf:protobuf-java:3.0.0'
        implementation 'org.slf4j:slf4j-simple:1.7.26'
        implementation 'org.apache.kafka:kafka-streams:2.3.0'
        implementation 'com.google.protobuf:protobuf-java:3.9.1'
    
        testCompile group: 'junit', name: 'junit', version: '4.12'
    }
    
    protobuf {
        generatedFilesBaseDir = "$projectDir/src/"
        protoc {
            artifact = 'com.google.protobuf:protoc:3.0.0'
        }
    }
    
    jar {
        manifest {
            attributes(
                    'Class-Path': configurations.compile.collect { it.getName() }.join(' '),
                    'Main-Class': 'huxihx.kafkastreams.AggregatingSum'
            )
        }
    }
    
    shadowJar {
        archiveName = "kstreams-aggregating-sum-standalone-${version}.${extension}"
    }

    注意我们设定的主类名称是huxihx.kafkastreams.AggregatingSum。

    保存上面的文件,然后执行下列命令下载Gradle的wrapper套件:

    $ gradle wrapper
    

    做完这些之后,我们在aggregate-sum目录下创建名为configuration的子目录,用于保存我们的参数配置文件dev.properties:

    $ mkdir configuration
    application.id=agg-sum-app
    bootstrap.servers=localhost:9092
    
    input.topic.name=movie-ticket-sales
    input.topic.partitions=1
    input.topic.replication.factor=1
    
    output.topic.name=movie-revenue
    output.topic.partitions=1
    output.topic.replication.factor=1
    

    这里我们配置了一个输入topic和一个输出topic,分别保存输入消息流和求和汇总后的新消息流。

    4. 创建消息Schema

    接下来创建用到的topic的schema。在aggregate-sum下执行命令创建保存schema的文件夹:

    $ mkdir -p src/main/proto
    

    之后在proto文件夹下创建名为ticket-sale.proto文件,内容如下:

     
    syntax = "proto3";

    package huxihx.kafkastreams.proto;

    message TicketSale {
    string title = 1;
    string sale_ts = 2;
    int32 ticket_total_value = 3;
    }

    保存之后在aggregate-sum目录下运行gradlew命令:

    $ ./gradlew build
    

    此时,你应该可以在aggregate-sum/src/main/java/huxihx/kafkastreams/proto下看到生成的Java类:TicketSaleOuterClass。

    5. 创建Serdes

    这一步我们为所需的topic消息创建各自的Serdes。首先在aggregate-sum目录下执行下面的命令创建对应的文件夹目录:

    $ mkdir -p src/main/java/huxihx/kafkastreams/serdes
    

    之后在新创建的serdes文件夹下创建ProtobufSerializer.java:

    package huxihx.kafkastreams.serdes;
       
    import com.google.protobuf.MessageLite;
    import org.apache.kafka.common.serialization.Serializer;
       
    public class ProtobufSerializer<T extends MessageLite> implements Serializer<T> {
        @Override
        public byte[] serialize(String topic, T data) {
            return data == null ? new byte[0] : data.toByteArray();
        }
    }
    

    然后是ProtobufDeserializer.java:

    package huxihx.kafkastreams.serdes;
       
    import com.google.protobuf.InvalidProtocolBufferException;
    import com.google.protobuf.MessageLite;
    import com.google.protobuf.Parser;
    import org.apache.kafka.common.errors.SerializationException;
    import org.apache.kafka.common.serialization.Deserializer;
       
    import java.util.Map;
       
    public class ProtobufDeserializer<T extends MessageLite> implements Deserializer<T> {
       
        private Parser<T> parser;
       
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
            parser = (Parser<T>) configs.get("parser");
        }
       
        @Override
        public T deserialize(String topic, byte[] data) {
            try {
                return parser.parseFrom(data);
            } catch (InvalidProtocolBufferException e) {
                throw new SerializationException("Failed to deserialize from a protobuf byte array.", e);
            }
        }
    }
    

    最后是ProtobufSerdes.java:

    package huxihx.kafkastreams.serdes;
       
    import com.google.protobuf.MessageLite;
    import com.google.protobuf.Parser;
    import org.apache.kafka.common.serialization.Deserializer;
    import org.apache.kafka.common.serialization.Serde;
    import org.apache.kafka.common.serialization.Serializer;
       
    import java.util.HashMap;
    import java.util.Map;
       
    public class ProtobufSerdes<T extends MessageLite> implements Serde<T> {
       
        private final Serializer<T> serializer;
        private final Deserializer<T> deserializer;
       
        public ProtobufSerdes(Parser<T> parser) {
            serializer = new ProtobufSerializer<>();
            deserializer = new ProtobufDeserializer<>();
            Map<String, Parser<T>> config = new HashMap<>();
            config.put("parser", parser);
            deserializer.configure(config, false);
        }
       
        @Override
        public Serializer<T> serializer() {
            return serializer;
        }
       
        @Override
        public Deserializer<T> deserializer() {
            return deserializer;
        }
    }
    

    6. 开发主流程

    首先在src/main/java/huxihx/kafkastreams下创建AggregatingSum.java,代码如下:

    package huxihx.kafkastreams;
    
    import huxihx.kafkastreams.proto.TicketSaleOuterClass;
    import huxihx.kafkastreams.serdes.ProtobufSerdes;
    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.AdminClientConfig;
    import org.apache.kafka.clients.admin.NewTopic;
    import org.apache.kafka.clients.admin.TopicListing;
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.KeyValue;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    import org.apache.kafka.streams.kstream.Consumed;
    import org.apache.kafka.streams.kstream.Grouped;
    import org.apache.kafka.streams.kstream.Produced;
    
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;
    import java.util.stream.Collectors;
    
    public class AggregatingSum {
        public static void main(String[] args) throws Exception {
            if (args.length < 1) {
                throw new IllegalArgumentException(
                        "This program takes one argument: the path to an environment configuration file.");
            }
            new AggregatingSum().runRecipe(args[0]);
        }
    
        private void runRecipe(final String configPath) throws Exception {
            Properties envProps = this.loadEnvProperties(configPath);
            Properties streamProps = this.createStreamsProperties(envProps);
    
            Topology topology = this.buildTopology(envProps, ticketSaleProtobufSerdes());
            this.preCreateTopics(envProps);
    
            final KafkaStreams streams = new KafkaStreams(topology, streamProps);
            final CountDownLatch latch = new CountDownLatch(1);
    
            // Attach shutdown handler to catch Control-C.
            Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });
    
            try {
                streams.start();
                latch.await();
            } catch (Throwable e) {
                System.exit(1);
            }
            System.exit(0);
    
        }
    
        private Properties createStreamsProperties(Properties envProps) {
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, envProps.getProperty("application.id"));
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers"));
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
            return props;
        }
    
        private void preCreateTopics(Properties envProps) throws Exception {
            Map<String, Object> config = new HashMap<>();
            config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers"));
            String inputTopic = envProps.getProperty("input.topic.name");
            String outputTopic = envProps.getProperty("output.topic.name");
            try (AdminClient client = AdminClient.create(config)) {
                Collection<TopicListing> existingTopics = client.listTopics().listings().get();
    
                List<NewTopic> topics = new ArrayList<>();
                List<String> topicNames = existingTopics.stream().map(TopicListing::name).collect(Collectors.toList());
                if (!topicNames.contains(inputTopic))
                    topics.add(new NewTopic(
                            inputTopic,
                            Integer.parseInt(envProps.getProperty("input.topic.partitions")),
                            Short.parseShort(envProps.getProperty("input.topic.replication.factor"))));
    
                if (!topicNames.contains(outputTopic))
                    topics.add(new NewTopic(
                            outputTopic,
                            Integer.parseInt(envProps.getProperty("output.topic.partitions")),
                            Short.parseShort(envProps.getProperty("output.topic.replication.factor"))));
    
                if (!topics.isEmpty())
                    client.createTopics(topics).all().get();
            }
        }
    
        private Properties loadEnvProperties(String fileName) throws IOException {
            Properties envProps = new Properties();
            try (FileInputStream input = new FileInputStream(fileName)) {
                envProps.load(input);
            }
            return envProps;
        }
    
        private Topology buildTopology(Properties envProps, final ProtobufSerdes<TicketSaleOuterClass.TicketSale> ticketSaleSerde) {
            final StreamsBuilder builder = new StreamsBuilder();
            final String inputTopic = envProps.getProperty("input.topic.name");
            final String outputTopic = envProps.getProperty("output.topic.name");
    
            builder.stream(inputTopic, Consumed.with(Serdes.String(), ticketSaleSerde))
                    .map((k, v) -> new KeyValue<>(v.getTitle(), v.getTicketTotalValue()))
                    .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer()))
                    .reduce(Integer::sum)
                    .toStream()
                    .to(outputTopic, Produced.with(Serdes.String(), Serdes.Integer()));
            return builder.build();
        }
    
        private static ProtobufSerdes<TicketSaleOuterClass.TicketSale> ticketSaleProtobufSerdes() {
            return new ProtobufSerdes<>(TicketSaleOuterClass.TicketSale.parser());
        }
    }
    

    7. 编写测试Producer

    和之前的入门系列一样,我们编写TestProducer类,位置在src/main/java/huxihx/kafkastreams/tests/TestProducer.java,内容如下:

    package huxihx.kafkastreams.tests;
    
    import huxihx.kafkastreams.proto.TicketSaleOuterClass;
    import huxihx.kafkastreams.serdes.ProtobufSerializer;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    
    public class TestProducer {
    
        private static final List<TicketSaleOuterClass.TicketSale> TEST_EVENTS = Arrays.asList(
                TicketSaleOuterClass.TicketSale.newBuilder().setTitle("Die Hard").setSaleTs("2019-07-18T10:00:00Z").setTicketTotalValue(12).build(),
                TicketSaleOuterClass.TicketSale.newBuilder().setTitle("Die Hard").setSaleTs("2019-07-18T10:01:00Z").setTicketTotalValue(12).build(),
                TicketSaleOuterClass.TicketSale.newBuilder().setTitle("The Godfather").setSaleTs("2019-07-18T10:01:31Z").setTicketTotalValue(12).build(),
                TicketSaleOuterClass.TicketSale.newBuilder().setTitle("Die Hard").setSaleTs("2019-07-18T10:01:36Z").setTicketTotalValue(24).build(),
                TicketSaleOuterClass.TicketSale.newBuilder().setTitle("The Godfather").setSaleTs("2019-07-18T10:00:00Z").setTicketTotalValue(18).build(),
                TicketSaleOuterClass.TicketSale.newBuilder().setTitle("The Big Lebowski").setSaleTs("2019-07-18T10:40:00Z").setTicketTotalValue(12).build(),
                TicketSaleOuterClass.TicketSale.newBuilder().setTitle("The Big Lebowski").setSaleTs("2019-07-18T10:50:00Z").setTicketTotalValue(12).build(),
                TicketSaleOuterClass.TicketSale.newBuilder().setTitle("The Godfather").setSaleTs("2019-07-18T10:00:550Z").setTicketTotalValue(36).build(),
                TicketSaleOuterClass.TicketSale.newBuilder().setTitle("The Godfather").setSaleTs("2019-07-18T10:00:34Z").setTicketTotalValue(18).build()
        );
    
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ProducerConfig.ACKS_CONFIG, "all");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, new ProtobufSerializer<TicketSaleOuterClass.TicketSale>().getClass());
    
            try (final Producer<String, TicketSaleOuterClass.TicketSale> producer = new KafkaProducer<>(props)) {
                TEST_EVENTS.stream().map(event ->
                        new ProducerRecord<String, TicketSaleOuterClass.TicketSale>("movie-ticket-sales", event)).forEach(producer::send);
            }
        }
    }
    

    8. 测试

    首先我们运行下列命令构建项目:

    $ ./gradlew shadowJar 
    

    然后启动Kafka集群,之后运行Kafka Streams应用:

    $ java -jar build/libs/kstreams-transform-standalone-0.0.1.jar configuration/dev.properties
    

    现在启动一个终端运行测试Producer:

    $ java -cp build/libs/kstreams-transform-standalone-0.0.1.jar huxihx.kafkastreams.tests.TestProducer  

      

    然后再打开一个终端运行ConsoleConsumer测试汇总求和的消息流:

    $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic movie-revenue --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
    
    Die Hard	12
    Die Hard	24
    The Godfather	12
    Die Hard	48
    The Godfather	30
    The Big Lebowski	12
    The Big Lebowski	24
    The Godfather	66
    The Godfather	84
    

    如果一切正常,你应该可以看到上面的输出。该Kafka Streams会为每部电影实时统计票房。

  • 相关阅读:
    Python高效编程技巧
    Python使用Flask框架,结合Highchart,自定义图表样式主题
    Python使用Flask框架,结合Highchart,自定义基本上算是最全的导出菜单了
    Python使用Flask框架,结合Highchart,自定义导出菜单项目及顺序
    Python使用Flask框架,结合Highchart,搭配数据功能模块,加载 HTML 表格数据
    Python使用Flask框架,结合Highchart,搭配数据功能模块处理csv数据
    Python使用Flask框架,结合Highchart处理jsonl数据
    Python使用Flask框架,结合Highchart处理xml数据
    Python使用Flask框架,结合Highchart处理csv数据(引申-从文件获取数据--从数据库获取数据)
    使用Python的Flask框架,结合Highchart,动态渲染图表(Ajax 请求数据接口)
  • 原文地址:https://www.cnblogs.com/huxi2b/p/12255840.html
Copyright © 2011-2022 走看看