zoukankan      html  css  js  c++  java
  • Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十一)定制一个arvo格式文件发送到kafka的topic,通过Structured Streaming读取kafka的数据

    将arvo格式数据发送到kafka的topic

    第一步:定制avro schema:

    {  
          "type": "record",  
          "name": "userlog",  
          "fields": [
                {"name": "ip","type": "string"},
                {"name": "identity","type":"string"},
                {"name": "userid","type":"int"},
                {"name": "time","type": "string"},
                {"name": "requestinfo","type": "string"},
                {"name": "state","type": "int"},
                {"name": "responce","type": "string"},
                {"name": "referer","type": "string"},
                {"name": "useragent","type": "string"},
                {"name": "timestamp","type": "long"}
          ]  
    }

    定义一个avro的schema文件userlog.avsc,内容如上。

    该schema包含字段:ip:string,identity:string,userid:int,time:string,requestinfo:string,state:int,responce:string,referer:string,useragent:string,timestamp:long。这些字段用来描述一个网络请求日志。

    第二步:创建发送数据到topic的producer对象:

    要实现发送数据到kafka上,我们必须通过kafka api生成一个producer对象,用于向kafka生产数据:

        private static Producer<String, byte[]> createProducer() {
            Properties props = new Properties();
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", StringSerializer.class.getName());
            props.put("value.serializer", ByteArraySerializer.class.getName());
            // 声明kafka broker
            props.put("bootstrap.servers", "192.168.0.121:9092");
            Producer<String, byte[]> procuder = new KafkaProducer<String, byte[]>(props);
            return procuder;
        }

    此时需要引入kafka的开发jar包:kafka-clients-0.10.0.1.jar。

    第三步:解析avro schema文件为Schema对象,并通过schema对象创建record对象(GenericRecord)

    解析avro schema文件为Schema对象,需要依赖包:avro-1.7.5.jar

    这里我们定义一个SchemaUtil.java类,该方法提供了一个getAvroSchemaFromHDFSFile方法用来实现从hdfs上读取avro文件,并把该avro文件解析为schema对象。

    package com.dx.streaming.producer;
    
    import java.io.IOException;
    import java.io.InputStream;
    
    import org.apache.avro.Schema;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class SchemaUtil {
        public static Schema getAvroSchemaFromHDFSFile(String hdfsAvroFile) throws Exception {
            InputStream inputStream;
            Path pt = new Path(hdfsAvroFile);        
            Schema schema = null;
            FileSystem fs =null;
            
            try {
                fs = FileSystem.get(new Configuration());
                if (!fs.exists(pt)) {
                    throw new Exception(pt+" file is not exists");
                }
                
                inputStream = fs.open(pt);
                Schema.Parser parser = new Schema.Parser();
                schema = parser.parse(inputStream);
            } catch (IOException e) {
                e.printStackTrace();
                throw e;
            } finally {
                if(fs!=null){
                    try {
                        fs.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    
            return schema;
        }
    }

    通过schema对象创建record对象(GenericRecord),该record存储了实际的生产数据。

                Random random = new Random();
                String ip = random.nextInt(255) + ":" + random.nextInt(255) + ":" + random.nextInt(255) + ":" + random.nextInt(255);
                String identity = UUID.randomUUID().toString();
                int userid = random.nextInt(100);
    
                SimpleDateFormat dfs = new SimpleDateFormat("yyyy-MM-dd ");
                Date date= new Date();
                String yyyyMMdd    =dfs.format(date);            
                String time = yyyyMMdd+ random.nextInt(24) + ":" + random.nextInt(60) + ":" + random.nextInt(60);
                String requestInfo = "....";
                int state = random.nextInt(600);
                String responce = "...";
                String referer = "...";
                String useragent = "...";
                SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
                GenericRecord record = new GenericData.Record(schema);
                record.put("ip", ip);
                record.put("identity", identity);
                record.put("userid", userid);
                record.put("time", time);
                record.put("requestinfo", requestInfo);
                record.put("state", state);
                record.put("responce", responce);
                record.put("referer", referer);
                record.put("useragent", useragent);
                record.put("timestamp", format.parse(time).getTime());

    备注:上边代码就是按照schema创建了一个GenericRecord对象,该GenericRecord对象用来存储了真是的数据。

    而且record对象可以通过Injection<GenericRecord, byte[]>对象转化为byte[],更便于在生产数据过程中传输。

    String avroFilePath = "/user/dx/conf/avro/userlog.avsc";
    Schema schema = SchemaUtil.getAvroSchemaFromHDFSFile(avroFilePath);
    Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
    byte[] bytes = recordInjection.apply(record);

    实际上在consumer端,接收数据时:当consumer接收到数据时,可以通过Injection<GenericRecord, byte[]> recordInjection对象对接收到的byte[]数据进行avro解析,解析为一个GenericRecord对象。

            Logger logger = LoggerFactory.getLogger("AvroKafkaConsumer");
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.0.121:9092,192.168.0.122:9092");
            props.put("group.id", "testgroup");
            props.put("key.deserializer", StringDeserializer.class.getName());
            props.put("value.deserializer", ByteArrayDeserializer.class.getName());
    
            KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(props);
    
            consumer.subscribe(Collections.singletonList(“topic name”));
            Schema.Parser parser = new Schema.Parser();
            Schema schema = parser.parse("avro schema file path");
            Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
    
            try {
                while (true) {
                    ConsumerRecords<String, byte[]> records = consumer.poll(1000);
                    for (ConsumerRecord<String, byte[]> record : records) {
                        GenericRecord genericRecord = recordInjection.invert(record.value()).get();
                        String info = String.format(String.format("topic = %s, partition = %s, offset = %d, customer = %s,country = %s
    ", record.topic(), record.partition(), record.offset(), record.key(), genericRecord.get("str1")));
                        logger.info(info);
                    }
                }
            } finally {
                consumer.close();
            }

    备注:具体更多详情请参考《Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十三)定义一个avro schema使用comsumer发送avro字符流,producer接受avro字符流并解析

    第四步:通过producer发送数据到topic:

    发送byte[]数据到kafka:需要先铜鼓kafka api生成一个producer对象,将上边的record数据转化为byte[]格式,调用producre的send方法发送数据。

    Producer<String, byte[]> procuder = createProducer();
    // 根据avro schema文件生成schema对象。
    // 根据schema对象,生成record,并把数据存储到record中。
    // 根据schema对象,生成record转化为byte[]的转化器Injection<GenericRecord, byte[]>。
    try {
        byte[] bytes = recordInjection.apply(record);
    
        ProducerRecord<String, byte[]> msg = new ProducerRecord<String, byte[]>(topic, bytes);
        procuder.send(msg);
    } catch (Exception e) {
        e.printStackTrace();
    }

    上边的四个步骤已经简单的介绍了如何把一个待生产的数据转化为record对象,并把record对象转化为byte[]类型,发送到kafka的几个重要步骤及其实现思路。下边的代码就是一个完整的实现:

    package com.dx.streaming.producer;
    
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;
    import java.util.Properties;
    import java.util.Random;
    import java.util.UUID;
    
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericData;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.PartitionInfo;
    import org.apache.kafka.common.serialization.ByteArraySerializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.apache.spark.SparkConf;
    import org.apache.spark.sql.SparkSession;
    
    import com.twitter.bijection.Injection;
    import com.twitter.bijection.avro.GenericAvroCodecs;
    
    public class TestProducer {
        private static final String avroFilePath = "D:\Java_Study\workspace\kafka-streaming-learn\conf\avro\userlog.avsc";
        //private static final String avroFilePath = "/user/dx/conf/avro/userlog.avsc";
        private static final String topic = "t-my";
    
        public static void main(String[] args) throws Exception {
            int size = 0;
            String appName = "Test Avro";
            SparkConf conf = new SparkConf().setMaster("local[2]").setAppName(appName);
            SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
    
            Schema schema = SchemaUtil.getAvroSchemaFromHDFSFile(avroFilePath);
            Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
    
            Producer<String, byte[]> procuder = createProducer();
            while (true) {
                Random random = new Random();
                String ip = random.nextInt(255) + ":" + random.nextInt(255) + ":" + random.nextInt(255) + ":" + random.nextInt(255);
                String identity = UUID.randomUUID().toString();
                int userid = random.nextInt(100);
    
                SimpleDateFormat dfs = new SimpleDateFormat("yyyy-MM-dd ");
                Date date= new Date();
                String yyyyMMdd    =dfs.format(date);            
                String time = yyyyMMdd+ random.nextInt(24) + ":" + random.nextInt(60) + ":" + random.nextInt(60);
                String requestInfo = "....";
                int state = random.nextInt(600);
                String responce = "...";
                String referer = "...";
                String useragent = "...";
                SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
                GenericRecord record = new GenericData.Record(schema);
                record.put("ip", ip);
                record.put("identity", identity);
                record.put("userid", userid);
                record.put("time", time);
                record.put("requestinfo", requestInfo);
                record.put("state", state);
                record.put("responce", responce);
                record.put("referer", referer);
                record.put("useragent", useragent);
                record.put("timestamp", format.parse(time).getTime());
    
                System.out.println("ip:" + ip + ",identity:" + identity + ",userid:" + userid + ",time:" + time + ",timestamp:" + format.parse(time).getTime() + "
    ");
    
                try {
                    byte[] bytes = recordInjection.apply(record);
    
                    ProducerRecord<String, byte[]> msg = new ProducerRecord<String, byte[]>(topic, bytes);
                    procuder.send(msg);
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
                size++;
    
                if (size % 100 == 0) {
                    Thread.sleep(100);
                    if (size > 1000) {
                        break;
                    }
                }
            }
    
            // 列出topic的相关信息
            List<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
            partitions = procuder.partitionsFor(topic);
            for (PartitionInfo p : partitions) {
                System.out.println(p);
            }
    
            System.out.println("send message over.");
            procuder.close(100, java.util.concurrent.TimeUnit.MILLISECONDS);
        }
    
        private static Producer<String, byte[]> createProducer() {
            Properties props = new Properties();
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", StringSerializer.class.getName());
            props.put("value.serializer", ByteArraySerializer.class.getName());
            // 声明kafka broker
            props.put("bootstrap.servers", "192.168.0.121:9092");
            Producer<String, byte[]> procuder = new KafkaProducer<String, byte[]>(props);
            return procuder;
        }
    }

    此时pom.xm配置如下:

            <dependency>
                <groupId>com.twitter</groupId>
                <artifactId>bijection-avro_2.11</artifactId>
                <version>0.9.5</version>
            </dependency>
    
            <dependency>
                <groupId>com.databricks</groupId>
                <artifactId>spark-avro_2.11</artifactId>
                <version>3.2.0</version>
                <type>jar</type>
            </dependency>        
            
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.2.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>2.2.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>2.2.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
                <version>2.2.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
                <version>2.2.0</version>
            </dependency>

    声明:若为了满足上边代码,这里的pom配置中个别dependency是多余的,但是下边的Structured Streaming端是需要的。

    测试的打印结果:

    ip:229:21:203:40,identity:ae6fde10-4687-4682-a760-d9076892eb45,userid:9,time:2018-07-12 12:57:24,timestamp:1531371444000
    ip:105:224:103:61,identity:edef8c93-da4e-46d4-bfd3-551b74e6f4df,userid:1,time:2018-07-12 23:57:23,timestamp:1531411043000
    ip:252:230:234:213,identity:80e00a81-f6dd-4bf6-93a1-95154babdd08,userid:59,time:2018-07-12 9:36:37,timestamp:1531359397000
    ip:76:63:136:50,identity:630b66fb-95d7-4c63-a638-6f24396987d0,userid:33,time:2018-07-12 19:18:18,timestamp:1531394298000
    Partition(topic = t-my, partition = 0, leader = 0, replicas = [0,], isr = [0,]
    send message over.

    通过Structured Streaming读取kafka的数据

    注意事项:

    下边是采用structured streaming方式来编程,而非spark streaming方式来编程;

    它们的差别在于使用的API不同,原理上也不尽相同,需要开发人员自己清楚自己使用的是什么技术;

    当使用structured streaming编程,且使用kafka+spark时,你需要引入的maven依赖如下:

    <!-- spark-sql -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>2.2.0</version>
            </dependency>
            <!-- spark-sql -->
            
            <!-- spark-core -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.2.0</version>
            </dependency>
            <!-- spark-core -->
            
            <!-- Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
                <version>2.2.0</version>
            </dependency>
            <!-- Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html -->
    
            <!-- Spark Streaming Programming Guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#spark-streaming-programming-guide -->
            <!--
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>2.2.0</version>
            </dependency>
            -->
            <!-- Spark Streaming Programming Guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#spark-streaming-programming-guide -->
    
            <!-- Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html -->
            <!-- 
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
                <version>2.2.0</version>
            </dependency>
            -->
            <!-- Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0  or higher) http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html -->
    
            <!-- kafka client -->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.10.0.1</version>
            </dependency>
            <!-- kafka client -->
    
            <!-- avro -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-simple</artifactId>
                <version>1.7.21</version>
            </dependency>
            <dependency>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro</artifactId>
                <version>1.8.0</version>
            </dependency>
    
            <dependency>
                <groupId>com.twitter</groupId>
                <artifactId>bijection-avro_2.10</artifactId>
                <version>0.9.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro</artifactId>
                <version>1.7.4</version>
            </dependency>
            <!-- avro -->

    既然是读取kafka的avro的record的byte[]格式记录,这里就需要对其进行byte[]进行解析(解析为行:这里先将byte[]转化为record,再将record解析为了object[],之后通过RowFactory.create(object[])转化为Row的格式),解析函数独立定义了一个udf对象来处理:

    package com.dx.streaming.producer;
    
    import java.text.SimpleDateFormat;
    
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.api.java.UDF1;
    
    import com.twitter.bijection.Injection;
    import com.twitter.bijection.avro.GenericAvroCodecs;
    
    public class AvroParserUDF implements UDF1<byte[], Row> {
        private static final long serialVersionUID = -2369806025607566774L;
        private String avroSchemaFilePath=null;
        private transient Schema schema = null;
        private transient Injection<GenericRecord, byte[]> recordInjection = null;
    
        public AvroParserUDF(String avroSchemaFilePath) {
            this.avroSchemaFilePath=avroSchemaFilePath;
        }
    
        public Row call(byte[] data) throws Exception {
            if(this.recordInjection==null){            
                this.schema = SchemaUtil.getAvroSchemaFromHDFSFile(this.avroSchemaFilePath);        
                this.recordInjection = GenericAvroCodecs.toBinary(schema);        
            }
            
            GenericRecord record = this.recordInjection.invert(data).get();
    
            int timeIndex = record.getSchema().getFields().indexOf(record.getSchema().getField("time"));
    
            int iColumns = record.getSchema().getFields().size();
            Object[] values = new Object[iColumns];
            for (int i = 0; i < iColumns; i++) {
                values[i] = record.get(i);
                if (values[i] instanceof org.apache.avro.util.Utf8) {
                    values[i] = values[i].toString();
                }
            }
    //        SimpleDateFormat dfs=new SimpleDateFormat("yyyy-MM-dd HH:MM:SS");
    //        SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd 00:00:00");
    //        System.out.println(df.format(dfs.parse("2018-07-03 21:23:58")));
    //        output 2018-07-03 00:00:00
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:MM:SS");
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd 00:00:00");
            values[timeIndex] = df.format(sdf.parse((String) values[timeIndex]));
    
            return RowFactory.create(values);
        }
    }

    实现思路:使用sparkSession.readStream().format("kafka")方式读取kafka指定的topic,对kafka的byte[]格式数据转化(转化为Row),对Row进行操作。

    package com.dx.streaming.producer;
    
    import java.text.SimpleDateFormat;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.avro.Schema;
    import org.apache.spark.SparkConf;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Encoders;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.streaming.StreamingQuery;
    import org.apache.spark.sql.streaming.StreamingQueryException;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructType;
    
    import com.databricks.spark.avro.SchemaConverters;
    
    public class TestConsumer {
        //private static final String avroFilePath = "D:\Java_Study\workspace\kafka-streaming-learn\conf\avro\userlog.avsc";
        private static final String avroFilePath = "/user/dx/conf/avro/userlog.avsc";
        private static final String topic = "t-my";
    
        public static void main(String[] args) throws Exception {
            String appName = "Test Avro";
            SparkConf conf = new SparkConf().setMaster("local[*]").setAppName(appName);
            SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
    
            Map<String, String> kafkaOptions = new HashMap<String, String>();
            kafkaOptions.put("kafka.bootstrap.servers", "192.168.0.121:9092");
            
            Schema schema = SchemaUtil.getAvroSchemaFromHDFSFile(avroFilePath);    
            AvroParserUDF udf = new AvroParserUDF(avroFilePath);
            StructType type = (StructType) SchemaConverters.toSqlType(schema).dataType();        
            sparkSession.udf().register("deserialize", udf, DataTypes.createStructType(type.fields()));
    
            Dataset<Row> stream = sparkSession.readStream().format("kafka").options(kafkaOptions).option("subscribe", topic).option("startingOffsets", "earliest").load().select("value").as(Encoders.BINARY())
                    .selectExpr("deserialize(value) as row").select("row.*");
    
            stream.printSchema();
    
            // Print new data to console
            StreamingQuery query = stream.writeStream().format("console").start();
            
            try {
                query.awaitTermination();
                sparkSession.streams().awaitAnyTermination();
            } catch (StreamingQueryException e) {
                e.printStackTrace();
            }
        }
    }

    打包,提交用spark-submit:

    [spark@master work]$ more submit.sh 
    #! /bin/bash
    jars=""
    
    for file in `ls /home/spark/work/jars/*.jar`
    do
            jars=$file,$jars
            #echo $jars
    done
    
    
    echo "------------------------------------"
    echo $jars
    echo "------------------------------------"
    
    /opt/spark-2.2.1-bin-hadoop2.7/bin/spark-submit 
    --jars $jars 
    --master yarn 
    --verbose 
    --driver-java-options "-XX:+TraceClassPaths" 
    --num-executors 2 
    --executor-memory 1G 
    --executor-cores 1 
    --driver-memory 1G 
    --class com.dx.streaming.producer.TestConsumer 
    /home/spark/work/kafka-streaming-test.jar
    
    #--properties-file /home/spark/work/conf/spark-properties.conf 

    jars:

    [spark@master work]$ cd jars
    [spark@master jars]$ ls
    bijection-avro_2.11-0.9.5.jar  kafka-clients-0.10.0.1.jar  spark-sql_2.11-2.2.0.jar             spark-streaming_2.11-2.2.0.jar
    bijection-core_2.11-0.9.5.jar  spark-avro_2.11-3.2.0.jar   spark-sql-kafka-0-10_2.11-2.2.0.jar  spark-streaming-kafka-0-10_2.11-2.2.0.jar

    打印结果(备注这里是使用spark-submit提交方式):

    +--------------+--------------------+------+-------------------+-----------+-----+--------+-------+---------+-------------+
    |            ip|            identity|userid|               time|requestinfo|state|responce|referer|useragent|    timestamp|
    +--------------+--------------------+------+-------------------+-----------+-----+--------+-------+---------+-------------+
    |36:177:233:179|27be47c9-bcbc-4cd...|    27|2019-11-03 00:00:00|       ....|   88|     ...|    ...|      ...|1530624238000|
    |251:92:177:212|d711ca29-e2a7-4fb...|    24|2020-04-03 00:00:00|       ....|  129|     ...|    ...|      ...|1530570507000|
    |26:177:105:119|a98020dd-4fcb-4a0...|     4|2018-11-03 00:00:00|       ....|  322|     ...|    ...|      ...|1530619861000|
    |161:25:246:252|11bd7af7-b9db-428...|     3|2021-10-03 00:00:00|       ....|  249|     ...|    ...|      ...|1530582412000|
    | 48:131:47:112|c519b7cb-0265-4db...|     6|2021-09-03 00:00:00|       ....|  234|     ...|    ...|      ...|1530578717000|
    |  43:74:113:73|e5888022-97ad-425...|    99|2019-02-03 00:00:00|       ....|  406|     ...|    ...|      ...|1530584052000|
    |230:162:238:87|ae9ecc0d-6df5-418...|    55|2022-09-03 00:00:00|       ....|  128|     ...|    ...|      ...|1530561467000|
    |  0:138:183:88|2565b673-baed-4c9...|    85|2019-03-03 00:00:00|       ....|  460|     ...|    ...|      ...|1530548103000|
    |210:30:157:209|59a0f81c-7dfc-444...|    31|2021-07-03 00:00:00|       ....|  179|     ...|    ...|      ...|1530632595000|
    | 129:251:8:241|5483365c-79ef-429...|    96|2022-03-03 00:00:00|       ....|  368|     ...|    ...|      ...|1530600670000|
    |  32:70:106:42|d1dfa208-2a3f-4fe...|    40|2020-01-03 00:00:00|       ....|  184|     ...|    ...|      ...|1530559512000|
    |95:109:238:129|709eebbc-13fc-4e9...|    11|2019-02-03 00:00:00|       ....|  463|     ...|    ...|      ...|1530623652000|
    |123:171:142:15|0a4cc7d1-bdac-442...|    79|2022-08-03 00:00:00|       ....|  417|     ...|    ...|      ...|1530590205000|
    | 72:141:54:221|b94d268a-a464-4d7...|    94|2021-07-03 00:00:00|       ....|    1|     ...|    ...|      ...|1530567806000|
    |201:79:234:119|f1ca2db5-1688-459...|    66|2018-07-03 00:00:00|       ....|  531|     ...|    ...|      ...|1530565671000|
    |188:41:197:190|fe3d9faf-5376-4bb...|    86|2022-08-03 00:00:00|       ....|  522|     ...|    ...|      ...|1530568567000|
    | 197:115:58:51|1c9494e2-5dcc-4a4...|    73|2018-11-03 00:00:00|       ....|  214|     ...|    ...|      ...|1530630682000|
    | 213:242:0:177|e06cd131-da6d-499...|    11|2022-05-03 00:00:00|       ....|  530|     ...|    ...|      ...|1530604390000|
    | 70:109:32:120|37c95b44-d692-48e...|    66|2018-07-03 00:00:00|       ....|    7|     ...|    ...|      ...|1530576459000|
    |100:203:217:78|cff08213-b679-4b2...|    51|2020-04-03 00:00:00|       ....|  128|     ...|    ...|      ...|1530548883000|
    +--------------+--------------------+------+-------------------+-----------+-----+--------+-------+---------+-------------+
    only showing top 20 rows
    
    18/07/13 05:58:36 INFO streaming.StreamExecution: Streaming query made progress: {
      "id" : "efd34a20-36ae-48a5-89c3-2107bab3cbca",
      "runId" : "a73386c3-34cf-43ec-abe8-904671e269c8",
      "name" : null,
      "timestamp" : "2018-07-12T21:58:31.590Z",
      "numInputRows" : 19800,
      "processedRowsPerSecond" : 3887.6889848812093,
      "durationMs" : {
        "addBatch" : 3595,
        "getBatch" : 252,
        "getOffset" : 612,
        "queryPlanning" : 122,
        "triggerExecution" : 5092,
        "walCommit" : 487
      },
      "stateOperators" : [ ],
      "sources" : [ {
        "description" : "KafkaSource[Subscribe[t-my]]",
        "startOffset" : null,
        "endOffset" : {
          "t-my" : {
            "0" : 19800
          }
        },
        "numInputRows" : 19800,
        "processedRowsPerSecond" : 3887.6889848812093
      } ],
      "sink" : {
        "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@20bb170f"
      }
    }

    参考:

    在Spark结构化流readStream、writeStream 输入输出,及过程ETL

    Spark Structured Streaming入门编程指南

    Structured Streaming 实现思路与实现概述

    Spark结构式流编程指南

    Kafka 如何读取offset topic内容 (__consumer_offsets)

  • 相关阅读:
    抽象线程之Parallel类
    任务
    创建线程之Thread类和线程池
    创建线程之异步委托
    折半插入排序
    单链表的使用(插入,查找,删除,链表的倒置,删除相同结点)
    插入排序
    [Python]小甲鱼Python视频第034课(with else)课后题及参考解答
    [Python]小甲鱼Python视频第033课(except)课后题及参考解答
    [Python]小甲鱼Python视频第32课(except)课后题及参考解答
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/9261205.html
Copyright © 2011-2022 走看看