zoukankan      html  css  js  c++  java
  • Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十九):推送avro格式数据到topic,并使用spark structured streaming接收topic解析avro数据

    推送avro格式数据到topic

    源代码:https://github.com/Neuw84/structured-streaming-avro-demo/blob/master/src/main/java/es/aconde/structured/GeneratorDemo.java

    package es.aconde.structured;
    
    import com.twitter.bijection.Injection;
    import com.twitter.bijection.avro.GenericAvroCodecs;
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericData;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.util.SplittableRandom;
    import java.util.Properties;
    
    /**
     * Fake data generator for Kafka
     *
     * @author Angel Conde
     */
    public class GeneratorDemo {
    
        /**
         * Avro defined schema
         */
        public static final String USER_SCHEMA = "{"
                + ""type":"record","
                + ""name":"alarm","
                + ""fields":["
                + "  { "name":"str1", "type":"string" },"
                + "  { "name":"str2", "type":"string" },"
                + "  { "name":"int1", "type":"int" }"
                + "]}";
    
        /**
         *
         * @param args
         * @throws InterruptedException
         */
        public static void main(String[] args) throws InterruptedException {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
    
            Schema.Parser parser = new Schema.Parser();
            Schema schema = parser.parse(USER_SCHEMA);
            Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
    
            KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
            SplittableRandom random = new SplittableRandom();
    
            while (true) {
                GenericData.Record avroRecord = new GenericData.Record(schema);
                avroRecord.put("str1", "Str 1-" + random.nextInt(10));
                avroRecord.put("str2", "Str 2-" + random.nextInt(1000));
                avroRecord.put("int1", random.nextInt(10000));
    
                byte[] bytes = recordInjection.apply(avroRecord);
    
                ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
                producer.send(record);
                Thread.sleep(100);
            }
    
        }
    }

    使用spark structured streaming接收topic解析avro数据

    源代码:https://github.com/Neuw84/structured-streaming-avro-demo/blob/master/src/main/java/es/aconde/structured/StructuredDemo.java

    package es.aconde.structured;
    
    import com.databricks.spark.avro.SchemaConverters;
    import com.twitter.bijection.Injection;
    import com.twitter.bijection.avro.GenericAvroCodecs;
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.log4j.Level;
    import org.apache.log4j.LogManager;
    import org.apache.spark.SparkConf;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Encoders;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.streaming.StreamingQuery;
    import org.apache.spark.sql.streaming.StreamingQueryException;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructType;
    
    /**
     * Structured streaming demo using Avro'ed Kafka topic as input
     *
     * @author Angel Conde
     */
    public class StructuredDemo {
    
        private static Injection<GenericRecord, byte[]> recordInjection;
        private static StructType type;
        private static final String USER_SCHEMA = "{"
                + ""type":"record","
                + ""name":"myrecord","
                + ""fields":["
                + "  { "name":"str1", "type":"string" },"
                + "  { "name":"str2", "type":"string" },"
                + "  { "name":"int1", "type":"int" }"
                + "]}";
        private static Schema.Parser parser = new Schema.Parser();
        private static Schema schema = parser.parse(USER_SCHEMA);
    
        static { //once per VM, lazily
            recordInjection = GenericAvroCodecs.toBinary(schema);
            type = (StructType) SchemaConverters.toSqlType(schema).dataType();
    
        }
    
        public static void main(String[] args) throws StreamingQueryException {
            //set log4j programmatically
            LogManager.getLogger("org.apache.spark").setLevel(Level.WARN);
            LogManager.getLogger("akka").setLevel(Level.ERROR);
    
            //configure Spark
            SparkConf conf = new SparkConf()
                    .setAppName("kafka-structured")
                    .setMaster("local[*]");
    
            //initialize spark session
            SparkSession sparkSession = SparkSession
                    .builder()
                    .config(conf)
                    .getOrCreate();
    
            //reduce task number
            sparkSession.sqlContext().setConf("spark.sql.shuffle.partitions", "3");
    
            //data stream from kafka
            Dataset<Row> ds1 = sparkSession
                    .readStream()
                    .format("kafka")
                    .option("kafka.bootstrap.servers", "localhost:9092")
                    .option("subscribe", "mytopic")
                    .option("startingOffsets", "earliest")
                    .load();
    
            //start the streaming query
            sparkSession.udf().register("deserialize", (byte[] data) -> {
                GenericRecord record = recordInjection.invert(data).get();
                return RowFactory.create(record.get("str1").toString(), record.get("str2").toString(), record.get("int1"));
    
            }, DataTypes.createStructType(type.fields()));
            ds1.printSchema();
            Dataset<Row> ds2 = ds1
                    .select("value").as(Encoders.BINARY())
                    .selectExpr("deserialize(value) as rows")
                    .select("rows.*");
    
            ds2.printSchema();
    
            StreamingQuery query1 = ds2
                    .groupBy("str1")
                    .count()
                    .writeStream()
                    .queryName("Test query")
                    .outputMode("complete")
                    .format("console")
                    .start();
    
            query1.awaitTermination();
    
        }
    }
  • 相关阅读:
    [LeetCode] Baseball Game
    [Linux] Shell Scripts
    [Linux] 正则表达式与文件格式化处理
    [Linux] 学习bash
    [Linux] vim程序编辑器
    [Linux] 文件与文件系统的压缩打包与备份
    [LeetCode] Reverse Words in a String
    [LeetCode] Reverse Integer
    [国嵌笔记][017][Makefile工程管理]
    [国嵌笔记][016][交叉工具链]
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/9835005.html
Copyright © 2011-2022 走看看