zoukankan      html  css  js  c++  java
  • Structured Streaming demo

    
    	import org.apache.spark.sql.SparkSession
    
    	import org.apache.spark.sql.functions._
    
    	import org.apache.spark.sql.types.TimestampType
    
    	import org.apache.spark.sql.streaming.Trigger
    
    	import java.sql.Timestamp
    
    	
    
    	object StockCCICompute {
    
    	  
    
    	  def main(args: Array[String]): Unit = {
    
    	    
    
    	    val spark = SparkSession
    
    	      .builder
    
    	      .appName("StockCCICompute")
    
    	      .getOrCreate()
    
    	    
    
    	    //分别设置window长度、容忍最大晚到时间和触发间隔
    
    	    val windowDuration = "30 minutes"
    
    	    val waterThreshold = "5 minutes"
    
    	    val triggerTime = "1 minutes"
    
    	
    
    	    import spark.implicits._
    
    	
    
    	    spark.readStream
    
    	    .format("kafka")
    
    	    .option("kafka.bootstrap.servers", "broker1:port1,broker2:port2")
    
    	    .option("subscribe", "stock")
    
    	    .load()
    
    	    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    
    	    .as[(String, String)]
    
    	    //解析数据
    
    	    .map(f => {
    
    	        val companyNo = f._1
    
    	        val infos = f._2.split(",")
    
    	        (f._1,infos(0),infos(1),infos(2),infos(3),infos(4))
    
    	    })
    
    	    .toDF("companyno","timestamp","price","bidprice","sellpirce","avgprice")
    
    	    .selectExpr(
    
    	        "CAST(companyno AS STRING)",
    
    	        "CAST(timestamp AS TIMESTAMP[DF1] )",
    
    	        "CAST(price AS DOUBLE)",
    
    	        "CAST(bidprice AS DOUBLE)",
    
    	        "CAST(sellpirce AS DOUBLE)",
    
    	        "CAST(avgprice AS DOUBLE)")
    
    	    .as[(String,Timestamp,Double,Double,Double,Double)]
    
    	    //设定水位
    
    	    .withWatermark("timestamp", waterThreshold)
    
    	    .groupBy(
    
    	          window($"timestamp", 
    
    	              windowDuration), 
    
    	              $"companyno")
    
    	    //求出最高价、最低价和收盘价,其中收盘价需要自己开发UDAF
    
    	    .agg(
    
    	          max(col("price")).as("max_price"),
    
    	          min(col("price")).as("min_price"),
    
    	          ClosePriceUDAF(col("price").as("latest_price")))
    
    	    .writeStream
    
    	    .outputMode("append")
    
    	    .trigger(Trigger.ProcessingTime(triggerTime))
    
    	    //输出到HBase中
    
    	    .foreach(HBaseWriter)
    
    	    .start()
    
    	    .awaitTermination()
    
    	
    
    	  }
    
    	}
    
    
    
    
  • 相关阅读:
    wcf通道Channel
    固定位置右下角
    小闹钟(无样式)
    CSS小注意(初级)
    java少包汇总
    maven的pom.xml配置
    myeclipse 手动安装 lombok
    Could not synchronize database state with session
    (转)myeclipse插件—SVN分支与合并详解【图】
    Nginx的启动、停止与重启
  • 原文地址:https://www.cnblogs.com/weijiqian/p/14341981.html
Copyright © 2011-2022 走看看