将arvo格式数据发送到kafka的topic
第一步:定制avro schema:
{ "type": "record", "name": "userlog", "fields": [ {"name": "ip","type": "string"}, {"name": "identity","type":"string"}, {"name": "userid","type":"int"}, {"name": "time","type": "string"}, {"name": "requestinfo","type": "string"}, {"name": "state","type": "int"}, {"name": "responce","type": "string"}, {"name": "referer","type": "string"}, {"name": "useragent","type": "string"}, {"name": "timestamp","type": "long"} ] }
定义一个avro的schema文件userlog.avsc,内容如上。
该schema包含字段:ip:string,identity:string,userid:int,time:string,requestinfo:string,state:int,responce:string,referer:string,useragent:string,timestamp:long。这些字段用来描述一个网络请求日志。
第二步:创建发送数据到topic的producer对象:
要实现发送数据到kafka上,我们必须通过kafka api生成一个producer对象,用于向kafka生产数据:
private static Producer<String, byte[]> createProducer() { Properties props = new Properties(); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", ByteArraySerializer.class.getName()); // 声明kafka broker props.put("bootstrap.servers", "192.168.0.121:9092"); Producer<String, byte[]> procuder = new KafkaProducer<String, byte[]>(props); return procuder; }
此时需要引入kafka的开发jar包:kafka-clients-0.10.0.1.jar。
第三步:解析avro schema文件为Schema对象,并通过schema对象创建record对象(GenericRecord)
解析avro schema文件为Schema对象,需要依赖包:avro-1.7.5.jar
这里我们定义一个SchemaUtil.java类,该方法提供了一个getAvroSchemaFromHDFSFile方法用来实现从hdfs上读取avro文件,并把该avro文件解析为schema对象。
package com.dx.streaming.producer; import java.io.IOException; import java.io.InputStream; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class SchemaUtil { public static Schema getAvroSchemaFromHDFSFile(String hdfsAvroFile) throws Exception { InputStream inputStream; Path pt = new Path(hdfsAvroFile); Schema schema = null; FileSystem fs =null; try { fs = FileSystem.get(new Configuration()); if (!fs.exists(pt)) { throw new Exception(pt+" file is not exists"); } inputStream = fs.open(pt); Schema.Parser parser = new Schema.Parser(); schema = parser.parse(inputStream); } catch (IOException e) { e.printStackTrace(); throw e; } finally { if(fs!=null){ try { fs.close(); } catch (IOException e) { e.printStackTrace(); } } } return schema; } }
通过schema对象创建record对象(GenericRecord),该record存储了实际的生产数据。
Random random = new Random(); String ip = random.nextInt(255) + ":" + random.nextInt(255) + ":" + random.nextInt(255) + ":" + random.nextInt(255); String identity = UUID.randomUUID().toString(); int userid = random.nextInt(100); SimpleDateFormat dfs = new SimpleDateFormat("yyyy-MM-dd "); Date date= new Date(); String yyyyMMdd =dfs.format(date); String time = yyyyMMdd+ random.nextInt(24) + ":" + random.nextInt(60) + ":" + random.nextInt(60); String requestInfo = "...."; int state = random.nextInt(600); String responce = "..."; String referer = "..."; String useragent = "..."; SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); GenericRecord record = new GenericData.Record(schema); record.put("ip", ip); record.put("identity", identity); record.put("userid", userid); record.put("time", time); record.put("requestinfo", requestInfo); record.put("state", state); record.put("responce", responce); record.put("referer", referer); record.put("useragent", useragent); record.put("timestamp", format.parse(time).getTime());
备注:上边代码就是按照schema创建了一个GenericRecord对象,该GenericRecord对象用来存储了真是的数据。
而且record对象可以通过Injection<GenericRecord, byte[]>对象转化为byte[],更便于在生产数据过程中传输。
String avroFilePath = "/user/dx/conf/avro/userlog.avsc"; Schema schema = SchemaUtil.getAvroSchemaFromHDFSFile(avroFilePath); Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema); byte[] bytes = recordInjection.apply(record);
实际上在consumer端,接收数据时:当consumer接收到数据时,可以通过Injection<GenericRecord, byte[]> recordInjection对象对接收到的byte[]数据进行avro解析,解析为一个GenericRecord对象。
Logger logger = LoggerFactory.getLogger("AvroKafkaConsumer"); Properties props = new Properties(); props.put("bootstrap.servers", "192.168.0.121:9092,192.168.0.122:9092"); props.put("group.id", "testgroup"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(props); consumer.subscribe(Collections.singletonList(“topic name”)); Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse("avro schema file path"); Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema); try { while (true) { ConsumerRecords<String, byte[]> records = consumer.poll(1000); for (ConsumerRecord<String, byte[]> record : records) { GenericRecord genericRecord = recordInjection.invert(record.value()).get(); String info = String.format(String.format("topic = %s, partition = %s, offset = %d, customer = %s,country = %s ", record.topic(), record.partition(), record.offset(), record.key(), genericRecord.get("str1"))); logger.info(info); } } } finally { consumer.close(); }
备注:具体更多详情请参考《Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十三)定义一个avro schema使用comsumer发送avro字符流,producer接受avro字符流并解析》
第四步:通过producer发送数据到topic:
发送byte[]数据到kafka:需要先铜鼓kafka api生成一个producer对象,将上边的record数据转化为byte[]格式,调用producre的send方法发送数据。
Producer<String, byte[]> procuder = createProducer(); // 根据avro schema文件生成schema对象。 // 根据schema对象,生成record,并把数据存储到record中。 // 根据schema对象,生成record转化为byte[]的转化器Injection<GenericRecord, byte[]>。 try { byte[] bytes = recordInjection.apply(record); ProducerRecord<String, byte[]> msg = new ProducerRecord<String, byte[]>(topic, bytes); procuder.send(msg); } catch (Exception e) { e.printStackTrace(); }
上边的四个步骤已经简单的介绍了如何把一个待生产的数据转化为record对象,并把record对象转化为byte[]类型,发送到kafka的几个重要步骤及其实现思路。下边的代码就是一个完整的实现:
package com.dx.streaming.producer; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Properties; import java.util.Random; import java.util.UUID; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; import com.twitter.bijection.Injection; import com.twitter.bijection.avro.GenericAvroCodecs; public class TestProducer { private static final String avroFilePath = "D:\Java_Study\workspace\kafka-streaming-learn\conf\avro\userlog.avsc"; //private static final String avroFilePath = "/user/dx/conf/avro/userlog.avsc"; private static final String topic = "t-my"; public static void main(String[] args) throws Exception { int size = 0; String appName = "Test Avro"; SparkConf conf = new SparkConf().setMaster("local[2]").setAppName(appName); SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate(); Schema schema = SchemaUtil.getAvroSchemaFromHDFSFile(avroFilePath); Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema); Producer<String, byte[]> procuder = createProducer(); while (true) { Random random = new Random(); String ip = random.nextInt(255) + ":" + random.nextInt(255) + ":" + random.nextInt(255) + ":" + random.nextInt(255); String identity = UUID.randomUUID().toString(); int userid = random.nextInt(100); SimpleDateFormat dfs = new SimpleDateFormat("yyyy-MM-dd "); Date date= new Date(); String yyyyMMdd =dfs.format(date); String time = yyyyMMdd+ random.nextInt(24) + ":" + random.nextInt(60) + ":" + random.nextInt(60); String requestInfo = "...."; int state = random.nextInt(600); String responce = "..."; String referer = "..."; String useragent = "..."; SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); GenericRecord record = new GenericData.Record(schema); record.put("ip", ip); record.put("identity", identity); record.put("userid", userid); record.put("time", time); record.put("requestinfo", requestInfo); record.put("state", state); record.put("responce", responce); record.put("referer", referer); record.put("useragent", useragent); record.put("timestamp", format.parse(time).getTime()); System.out.println("ip:" + ip + ",identity:" + identity + ",userid:" + userid + ",time:" + time + ",timestamp:" + format.parse(time).getTime() + " "); try { byte[] bytes = recordInjection.apply(record); ProducerRecord<String, byte[]> msg = new ProducerRecord<String, byte[]>(topic, bytes); procuder.send(msg); } catch (Exception e) { e.printStackTrace(); } size++; if (size % 100 == 0) { Thread.sleep(100); if (size > 1000) { break; } } } // 列出topic的相关信息 List<PartitionInfo> partitions = new ArrayList<PartitionInfo>(); partitions = procuder.partitionsFor(topic); for (PartitionInfo p : partitions) { System.out.println(p); } System.out.println("send message over."); procuder.close(100, java.util.concurrent.TimeUnit.MILLISECONDS); } private static Producer<String, byte[]> createProducer() { Properties props = new Properties(); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", ByteArraySerializer.class.getName()); // 声明kafka broker props.put("bootstrap.servers", "192.168.0.121:9092"); Producer<String, byte[]> procuder = new KafkaProducer<String, byte[]>(props); return procuder; } }
此时pom.xm配置如下:
<dependency> <groupId>com.twitter</groupId> <artifactId>bijection-avro_2.11</artifactId> <version>0.9.5</version> </dependency> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-avro_2.11</artifactId> <version>3.2.0</version> <type>jar</type> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.0</version> </dependency>
声明:若为了满足上边代码,这里的pom配置中个别dependency是多余的,但是下边的Structured Streaming端是需要的。
测试的打印结果:
ip:229:21:203:40,identity:ae6fde10-4687-4682-a760-d9076892eb45,userid:9,time:2018-07-12 12:57:24,timestamp:1531371444000
ip:105:224:103:61,identity:edef8c93-da4e-46d4-bfd3-551b74e6f4df,userid:1,time:2018-07-12 23:57:23,timestamp:1531411043000
ip:252:230:234:213,identity:80e00a81-f6dd-4bf6-93a1-95154babdd08,userid:59,time:2018-07-12 9:36:37,timestamp:1531359397000
ip:76:63:136:50,identity:630b66fb-95d7-4c63-a638-6f24396987d0,userid:33,time:2018-07-12 19:18:18,timestamp:1531394298000
Partition(topic = t-my, partition = 0, leader = 0, replicas = [0,], isr = [0,]
send message over.
通过Structured Streaming读取kafka的数据
注意事项:
下边是采用structured streaming方式来编程,而非spark streaming方式来编程;
它们的差别在于使用的API不同,原理上也不尽相同,需要开发人员自己清楚自己使用的是什么技术;
当使用structured streaming编程,且使用kafka+spark时,你需要引入的maven依赖如下:
<!-- spark-sql --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0</version> </dependency> <!-- spark-sql --> <!-- spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0</version> </dependency> <!-- spark-core --> <!-- Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.2.0</version> </dependency> <!-- Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html --> <!-- Spark Streaming Programming Guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#spark-streaming-programming-guide --> <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.2.0</version> </dependency> --> <!-- Spark Streaming Programming Guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#spark-streaming-programming-guide --> <!-- Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html --> <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.0</version> </dependency> --> <!-- Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html --> <!-- kafka client --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.1</version> </dependency> <!-- kafka client --> <!-- avro --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.21</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>bijection-avro_2.10</artifactId> <version>0.9.2</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.7.4</version> </dependency> <!-- avro -->
既然是读取kafka的avro的record的byte[]格式记录,这里就需要对其进行byte[]进行解析(解析为行:这里先将byte[]转化为record,再将record解析为了object[],之后通过RowFactory.create(object[])转化为Row的格式),解析函数独立定义了一个udf对象来处理:
package com.dx.streaming.producer; import java.text.SimpleDateFormat; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.api.java.UDF1; import com.twitter.bijection.Injection; import com.twitter.bijection.avro.GenericAvroCodecs; public class AvroParserUDF implements UDF1<byte[], Row> { private static final long serialVersionUID = -2369806025607566774L; private String avroSchemaFilePath=null; private transient Schema schema = null; private transient Injection<GenericRecord, byte[]> recordInjection = null; public AvroParserUDF(String avroSchemaFilePath) { this.avroSchemaFilePath=avroSchemaFilePath; } public Row call(byte[] data) throws Exception { if(this.recordInjection==null){ this.schema = SchemaUtil.getAvroSchemaFromHDFSFile(this.avroSchemaFilePath); this.recordInjection = GenericAvroCodecs.toBinary(schema); } GenericRecord record = this.recordInjection.invert(data).get(); int timeIndex = record.getSchema().getFields().indexOf(record.getSchema().getField("time")); int iColumns = record.getSchema().getFields().size(); Object[] values = new Object[iColumns]; for (int i = 0; i < iColumns; i++) { values[i] = record.get(i); if (values[i] instanceof org.apache.avro.util.Utf8) { values[i] = values[i].toString(); } } // SimpleDateFormat dfs=new SimpleDateFormat("yyyy-MM-dd HH:MM:SS"); // SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd 00:00:00"); // System.out.println(df.format(dfs.parse("2018-07-03 21:23:58"))); // output 2018-07-03 00:00:00 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:MM:SS"); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd 00:00:00"); values[timeIndex] = df.format(sdf.parse((String) values[timeIndex])); return RowFactory.create(values); } }
实现思路:使用sparkSession.readStream().format("kafka")方式读取kafka指定的topic,对kafka的byte[]格式数据转化(转化为Row),对Row进行操作。
package com.dx.streaming.producer; import java.text.SimpleDateFormat; import java.util.HashMap; import java.util.Map; import org.apache.avro.Schema; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQueryException; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import com.databricks.spark.avro.SchemaConverters; public class TestConsumer { //private static final String avroFilePath = "D:\Java_Study\workspace\kafka-streaming-learn\conf\avro\userlog.avsc"; private static final String avroFilePath = "/user/dx/conf/avro/userlog.avsc"; private static final String topic = "t-my"; public static void main(String[] args) throws Exception { String appName = "Test Avro"; SparkConf conf = new SparkConf().setMaster("local[*]").setAppName(appName); SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate(); Map<String, String> kafkaOptions = new HashMap<String, String>(); kafkaOptions.put("kafka.bootstrap.servers", "192.168.0.121:9092"); Schema schema = SchemaUtil.getAvroSchemaFromHDFSFile(avroFilePath); AvroParserUDF udf = new AvroParserUDF(avroFilePath); StructType type = (StructType) SchemaConverters.toSqlType(schema).dataType(); sparkSession.udf().register("deserialize", udf, DataTypes.createStructType(type.fields())); Dataset<Row> stream = sparkSession.readStream().format("kafka").options(kafkaOptions).option("subscribe", topic).option("startingOffsets", "earliest").load().select("value").as(Encoders.BINARY()) .selectExpr("deserialize(value) as row").select("row.*"); stream.printSchema(); // Print new data to console StreamingQuery query = stream.writeStream().format("console").start(); try { query.awaitTermination(); sparkSession.streams().awaitAnyTermination(); } catch (StreamingQueryException e) { e.printStackTrace(); } } }
打包,提交用spark-submit:
[spark@master work]$ more submit.sh #! /bin/bash jars="" for file in `ls /home/spark/work/jars/*.jar` do jars=$file,$jars #echo $jars done echo "------------------------------------" echo $jars echo "------------------------------------" /opt/spark-2.2.1-bin-hadoop2.7/bin/spark-submit --jars $jars --master yarn --verbose --driver-java-options "-XX:+TraceClassPaths" --num-executors 2 --executor-memory 1G --executor-cores 1 --driver-memory 1G --class com.dx.streaming.producer.TestConsumer /home/spark/work/kafka-streaming-test.jar #--properties-file /home/spark/work/conf/spark-properties.conf
jars:
[spark@master work]$ cd jars [spark@master jars]$ ls bijection-avro_2.11-0.9.5.jar kafka-clients-0.10.0.1.jar spark-sql_2.11-2.2.0.jar spark-streaming_2.11-2.2.0.jar bijection-core_2.11-0.9.5.jar spark-avro_2.11-3.2.0.jar spark-sql-kafka-0-10_2.11-2.2.0.jar spark-streaming-kafka-0-10_2.11-2.2.0.jar
打印结果(备注这里是使用spark-submit提交方式):
+--------------+--------------------+------+-------------------+-----------+-----+--------+-------+---------+-------------+
| ip| identity|userid| time|requestinfo|state|responce|referer|useragent| timestamp|
+--------------+--------------------+------+-------------------+-----------+-----+--------+-------+---------+-------------+
|36:177:233:179|27be47c9-bcbc-4cd...| 27|2019-11-03 00:00:00| ....| 88| ...| ...| ...|1530624238000|
|251:92:177:212|d711ca29-e2a7-4fb...| 24|2020-04-03 00:00:00| ....| 129| ...| ...| ...|1530570507000|
|26:177:105:119|a98020dd-4fcb-4a0...| 4|2018-11-03 00:00:00| ....| 322| ...| ...| ...|1530619861000|
|161:25:246:252|11bd7af7-b9db-428...| 3|2021-10-03 00:00:00| ....| 249| ...| ...| ...|1530582412000|
| 48:131:47:112|c519b7cb-0265-4db...| 6|2021-09-03 00:00:00| ....| 234| ...| ...| ...|1530578717000|
| 43:74:113:73|e5888022-97ad-425...| 99|2019-02-03 00:00:00| ....| 406| ...| ...| ...|1530584052000|
|230:162:238:87|ae9ecc0d-6df5-418...| 55|2022-09-03 00:00:00| ....| 128| ...| ...| ...|1530561467000|
| 0:138:183:88|2565b673-baed-4c9...| 85|2019-03-03 00:00:00| ....| 460| ...| ...| ...|1530548103000|
|210:30:157:209|59a0f81c-7dfc-444...| 31|2021-07-03 00:00:00| ....| 179| ...| ...| ...|1530632595000|
| 129:251:8:241|5483365c-79ef-429...| 96|2022-03-03 00:00:00| ....| 368| ...| ...| ...|1530600670000|
| 32:70:106:42|d1dfa208-2a3f-4fe...| 40|2020-01-03 00:00:00| ....| 184| ...| ...| ...|1530559512000|
|95:109:238:129|709eebbc-13fc-4e9...| 11|2019-02-03 00:00:00| ....| 463| ...| ...| ...|1530623652000|
|123:171:142:15|0a4cc7d1-bdac-442...| 79|2022-08-03 00:00:00| ....| 417| ...| ...| ...|1530590205000|
| 72:141:54:221|b94d268a-a464-4d7...| 94|2021-07-03 00:00:00| ....| 1| ...| ...| ...|1530567806000|
|201:79:234:119|f1ca2db5-1688-459...| 66|2018-07-03 00:00:00| ....| 531| ...| ...| ...|1530565671000|
|188:41:197:190|fe3d9faf-5376-4bb...| 86|2022-08-03 00:00:00| ....| 522| ...| ...| ...|1530568567000|
| 197:115:58:51|1c9494e2-5dcc-4a4...| 73|2018-11-03 00:00:00| ....| 214| ...| ...| ...|1530630682000|
| 213:242:0:177|e06cd131-da6d-499...| 11|2022-05-03 00:00:00| ....| 530| ...| ...| ...|1530604390000|
| 70:109:32:120|37c95b44-d692-48e...| 66|2018-07-03 00:00:00| ....| 7| ...| ...| ...|1530576459000|
|100:203:217:78|cff08213-b679-4b2...| 51|2020-04-03 00:00:00| ....| 128| ...| ...| ...|1530548883000|
+--------------+--------------------+------+-------------------+-----------+-----+--------+-------+---------+-------------+
only showing top 20 rows
18/07/13 05:58:36 INFO streaming.StreamExecution: Streaming query made progress: {
"id" : "efd34a20-36ae-48a5-89c3-2107bab3cbca",
"runId" : "a73386c3-34cf-43ec-abe8-904671e269c8",
"name" : null,
"timestamp" : "2018-07-12T21:58:31.590Z",
"numInputRows" : 19800,
"processedRowsPerSecond" : 3887.6889848812093,
"durationMs" : {
"addBatch" : 3595,
"getBatch" : 252,
"getOffset" : 612,
"queryPlanning" : 122,
"triggerExecution" : 5092,
"walCommit" : 487
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[t-my]]",
"startOffset" : null,
"endOffset" : {
"t-my" : {
"0" : 19800
}
},
"numInputRows" : 19800,
"processedRowsPerSecond" : 3887.6889848812093
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@20bb170f"
}
}
参考:
在Spark结构化流readStream、writeStream 输入输出,及过程ETL
Spark Structured Streaming入门编程指南