zoukankan      html  css  js  c++  java
  • Structured Streaming demo

    
    	import org.apache.spark.sql.SparkSession
    
    	import org.apache.spark.sql.functions._
    
    	import org.apache.spark.sql.types.TimestampType
    
    	import org.apache.spark.sql.streaming.Trigger
    
    	import java.sql.Timestamp
    
    	
    
    	object StockCCICompute {
    
    	  
    
    	  def main(args: Array[String]): Unit = {
    
    	    
    
    	    val spark = SparkSession
    
    	      .builder
    
    	      .appName("StockCCICompute")
    
    	      .getOrCreate()
    
    	    
    
    	    //分别设置window长度、容忍最大晚到时间和触发间隔
    
    	    val windowDuration = "30 minutes"
    
    	    val waterThreshold = "5 minutes"
    
    	    val triggerTime = "1 minutes"
    
    	
    
    	    import spark.implicits._
    
    	
    
    	    spark.readStream
    
    	    .format("kafka")
    
    	    .option("kafka.bootstrap.servers", "broker1:port1,broker2:port2")
    
    	    .option("subscribe", "stock")
    
    	    .load()
    
    	    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    
    	    .as[(String, String)]
    
    	    //解析数据
    
    	    .map(f => {
    
    	        val companyNo = f._1
    
    	        val infos = f._2.split(",")
    
    	        (f._1,infos(0),infos(1),infos(2),infos(3),infos(4))
    
    	    })
    
    	    .toDF("companyno","timestamp","price","bidprice","sellpirce","avgprice")
    
    	    .selectExpr(
    
    	        "CAST(companyno AS STRING)",
    
    	        "CAST(timestamp AS TIMESTAMP[DF1] )",
    
    	        "CAST(price AS DOUBLE)",
    
    	        "CAST(bidprice AS DOUBLE)",
    
    	        "CAST(sellpirce AS DOUBLE)",
    
    	        "CAST(avgprice AS DOUBLE)")
    
    	    .as[(String,Timestamp,Double,Double,Double,Double)]
    
    	    //设定水位
    
    	    .withWatermark("timestamp", waterThreshold)
    
    	    .groupBy(
    
    	          window($"timestamp", 
    
    	              windowDuration), 
    
    	              $"companyno")
    
    	    //求出最高价、最低价和收盘价,其中收盘价需要自己开发UDAF
    
    	    .agg(
    
    	          max(col("price")).as("max_price"),
    
    	          min(col("price")).as("min_price"),
    
    	          ClosePriceUDAF(col("price").as("latest_price")))
    
    	    .writeStream
    
    	    .outputMode("append")
    
    	    .trigger(Trigger.ProcessingTime(triggerTime))
    
    	    //输出到HBase中
    
    	    .foreach(HBaseWriter)
    
    	    .start()
    
    	    .awaitTermination()
    
    	
    
    	  }
    
    	}
    
    
    
    
  • 相关阅读:
    OpenCV用读取矩阵,访问图像数据
    OpenCV_Add方法
    OpenCV_颜色直方图的计算、显示、处理、对比及反向投影
    sift算法研究_无匹配
    OpenCV_轮廓例子
    OpenCV_用鼠标在窗口画方形
    【转】数字图像处理中的形态学
    OpenCV_ 滑动条模拟按钮
    OpenCV_轮廓的查找、表达、绘制、特性及匹配
    图像的膨胀与腐蚀、细化
  • 原文地址:https://www.cnblogs.com/weijiqian/p/14341981.html
Copyright © 2011-2022 走看看