zoukankan      html  css  js  c++  java
  • Structured Streaming demo

    
    	import org.apache.spark.sql.SparkSession
    
    	import org.apache.spark.sql.functions._
    
    	import org.apache.spark.sql.types.TimestampType
    
    	import org.apache.spark.sql.streaming.Trigger
    
    	import java.sql.Timestamp
    
    	
    
    	object StockCCICompute {
    
    	  
    
    	  def main(args: Array[String]): Unit = {
    
    	    
    
    	    val spark = SparkSession
    
    	      .builder
    
    	      .appName("StockCCICompute")
    
    	      .getOrCreate()
    
    	    
    
    	    //分别设置window长度、容忍最大晚到时间和触发间隔
    
    	    val windowDuration = "30 minutes"
    
    	    val waterThreshold = "5 minutes"
    
    	    val triggerTime = "1 minutes"
    
    	
    
    	    import spark.implicits._
    
    	
    
    	    spark.readStream
    
    	    .format("kafka")
    
    	    .option("kafka.bootstrap.servers", "broker1:port1,broker2:port2")
    
    	    .option("subscribe", "stock")
    
    	    .load()
    
    	    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    
    	    .as[(String, String)]
    
    	    //解析数据
    
    	    .map(f => {
    
    	        val companyNo = f._1
    
    	        val infos = f._2.split(",")
    
    	        (f._1,infos(0),infos(1),infos(2),infos(3),infos(4))
    
    	    })
    
    	    .toDF("companyno","timestamp","price","bidprice","sellpirce","avgprice")
    
    	    .selectExpr(
    
    	        "CAST(companyno AS STRING)",
    
    	        "CAST(timestamp AS TIMESTAMP[DF1] )",
    
    	        "CAST(price AS DOUBLE)",
    
    	        "CAST(bidprice AS DOUBLE)",
    
    	        "CAST(sellpirce AS DOUBLE)",
    
    	        "CAST(avgprice AS DOUBLE)")
    
    	    .as[(String,Timestamp,Double,Double,Double,Double)]
    
    	    //设定水位
    
    	    .withWatermark("timestamp", waterThreshold)
    
    	    .groupBy(
    
    	          window($"timestamp", 
    
    	              windowDuration), 
    
    	              $"companyno")
    
    	    //求出最高价、最低价和收盘价,其中收盘价需要自己开发UDAF
    
    	    .agg(
    
    	          max(col("price")).as("max_price"),
    
    	          min(col("price")).as("min_price"),
    
    	          ClosePriceUDAF(col("price").as("latest_price")))
    
    	    .writeStream
    
    	    .outputMode("append")
    
    	    .trigger(Trigger.ProcessingTime(triggerTime))
    
    	    //输出到HBase中
    
    	    .foreach(HBaseWriter)
    
    	    .start()
    
    	    .awaitTermination()
    
    	
    
    	  }
    
    	}
    
    
    
    
  • 相关阅读:
    一键完成SAP部署的秘密,想知道么?
    Azure进阶攻略丨Azure网络通不通,PsPing&PaPing告诉你答案
    在科技圈不懂“机器学习”?那你就out了
    狂欢圣诞节,Azure云邀你一起云端跑酷!
    计划程序:拒绝重复工作,让效率翻倍!
    爱,除了看怎么说,还要看怎么做 !
    Azure 12 月新公布
    开发者为何对Service Fabric爱不释手?值得关注!
    matlab之plot()函数
    对C++指针的一个比喻
  • 原文地址:https://www.cnblogs.com/weijiqian/p/14341981.html
Copyright © 2011-2022 走看看