Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理。
bin/spark-submit --class Streaming /home/wx/Stream.jar
hadoop fs -put /home/wx/123.txt /user/wx/
文本123.txt
NOTICE:07-26 logId[0072] NOTICE:07-26 logId[0073] NOTICE:07-26 logId[0074] NOTICE:07-26 logId[0075] NOTICE:07-26 logId[0076]
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SparkSession
object Streaming {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("RegexpExtract")
val ssc = new StreamingContext(conf, Seconds(1))
println("hello world")
val lines = ssc.textFileStream("hdfs://name-ha/user/wx/")
val ds = lines.flatMap(_.split("
"))
ds.print()
ds.foreachRDD { rdd =>
// Get the singleton instance of SparkSession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// Convert RDD[String] to DataFrame
val wordsDataFrame = rdd.toDF("str_col")
// Create a temporary view
wordsDataFrame.createOrReplaceTempView("df")
// Do word count on DataFrame using SQL and print it
val wordCountsDataFrame =
spark.sql(raw"""
select str_col,
regexp_extract(str_col,"NOTICE:\d{2}",0) notice,
regexp_extract(str_col,"logId\[(.*?)\]",0) logId
from df""")
wordCountsDataFrame.show(false)
}
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
}
}
执行结果
hello world ------------------------------------------- Time: 1501501752000 ms ------------------------------------------- NOTICE:07-26 logId[0072] NOTICE:07-26 logId[0073] NOTICE:07-26 logId[0074] NOTICE:07-26 logId[0075] NOTICE:07-26 logId[0076] +------------------------+---------+-----------+ |str_col |notice |logId | +------------------------+---------+-----------+ |NOTICE:07-26 logId[0072]|NOTICE:07|logId[0072]| |NOTICE:07-26 logId[0073]|NOTICE:07|logId[0073]| |NOTICE:07-26 logId[0074]|NOTICE:07|logId[0074]| |NOTICE:07-26 logId[0075]|NOTICE:07|logId[0075]| |NOTICE:07-26 logId[0076]|NOTICE:07|logId[0076]| +------------------------+---------+-----------+ ------------------------------------------- Time: 1501501770000 ms -------------------------------------------