1、print()
print操作会将DStream每一个batch中的前10个元素在driver节点打印出来。
ssc.textFileStream("file:\D:\workspace\idea\silent\src\main\resources\stream") .map((_, 1)) .reduceByKeyAndWindow((a: Int, b: Int) => a + b, (a: Int, b: Int) => a - b, Seconds(30), Seconds(10)) .print()
2. saveAsTextFiles(prefix, [suffix])
这个操作可以将DStream中的内容保存为text文件,每个batch的数据单独保存为一个文夹,文件夹名前缀参数必须传入,文件夹名后缀参数可选,最终文件夹名称的完整形式为prefix-TIME_IN_MS[.suffix]
ssc.textFileStream("file:\D:\workspace\idea\silent\src\main\resources\stream") .map((_, 1)) .reduceByKeyAndWindow((a: Int, b: Int) => a + b, (a: Int, b: Int) => a - b, Seconds(30), Seconds(10)) .saveAsTextFiles("file:\tmp","txt")
3、saveAsObjectFiles(prefix, [suffix])
这个操作和前面一个类似,只不过这里将DStream中的内容保存为SequenceFile文件类型,这个文件中保存的数据都是经过序列化后的Java对象。
4.saveAsHadoopFiles(prefix, [suffix])
这个操作和前两个类似,将DStream每一batch中的内容保存到HDFS上,同样可以指定文件的前缀和后缀。
5.foreachRDD(func)
可以作为存储DB系统操作
注:可以每个partition中初始化一个连接等
ssc.textFileStream("file:\D:\workspace\idea\silent\src\main\resources\stream") .map((_, 1)) .foreachRDD(f => { // 初始化连接存储系统等 f.foreachPartition(p => { p.foreach(println) }) })