# rate
from pyspark.sql import SparkSession spark = SparkSession.builder. appName("study_structured_streaming"). enableHiveSupport(). config("spark.debug.maxToStringFields", "100"). getOrCreate() df = spark.readStream .format(source="rate") .option("rowPerSecond", "5") .option("rampUpTime", "1s") .option("numPartitions", "2") .load() df.writeStream.format(source="console").outputMode("append").start().awaitTermination() spark.stop()
# Sink的例子
from pyspark.sql import SparkSession spark = SparkSession.builder. appName("study_structured_streaming"). enableHiveSupport(). config("spark.debug.maxToStringFields", "100"). getOrCreate() df = spark.readStream .format(source="rate") .option("rowPerSecond", "5") .option("rampUpTime", "1s") .option("numPartitions", "2") .load() df.writeStream.format("csv").option("checkpointLocation", "ck/20210623/").option("path", "output/123.csv") .start().awaitTermination() spark.stop()