spark streaming 入门例子:
spark shell
import org.apache.spark._ import org.apache.spark.streaming._ sc.getConf.setMaster("local").setAppName("RDDTest"); val ssc = new StreamingContext(sc, Seconds(2)); val fileStream = ssc.textFileStream("/tmp/data"); // fileStream.print(); fileStream.foreachRDD { rdd => { println("********************start*************************"); rdd.foreach { x => println(x) }; println("*********************end************************"); }}; ssc.start(); ssc.awaitTermination();
然后运行bash脚本,在tmp里写入数据,文件名随机:
mkdir /mmp/data #rm -rf /tmp/ ata" rm -f "/tmp/data/*" for ((j=0;j<30;j++)); do { for ((i=0;i<20;i++)); do file_name=`python -c 'import random;print random.random()'` echo "$j $i is sad story." >"/tmp/data/$file_name.log" done sleep 1 } done echo "OK, waiting..." echo "done"
结果为: