zoukankan      html  css  js  c++  java
  • Structured Streaming demo

    
    	import org.apache.spark.sql.SparkSession
    
    	import org.apache.spark.sql.functions._
    
    	import org.apache.spark.sql.types.TimestampType
    
    	import org.apache.spark.sql.streaming.Trigger
    
    	import java.sql.Timestamp
    
    	
    
    	object StockCCICompute {
    
    	  
    
    	  def main(args: Array[String]): Unit = {
    
    	    
    
    	    val spark = SparkSession
    
    	      .builder
    
    	      .appName("StockCCICompute")
    
    	      .getOrCreate()
    
    	    
    
    	    //分别设置window长度、容忍最大晚到时间和触发间隔
    
    	    val windowDuration = "30 minutes"
    
    	    val waterThreshold = "5 minutes"
    
    	    val triggerTime = "1 minutes"
    
    	
    
    	    import spark.implicits._
    
    	
    
    	    spark.readStream
    
    	    .format("kafka")
    
    	    .option("kafka.bootstrap.servers", "broker1:port1,broker2:port2")
    
    	    .option("subscribe", "stock")
    
    	    .load()
    
    	    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    
    	    .as[(String, String)]
    
    	    //解析数据
    
    	    .map(f => {
    
    	        val companyNo = f._1
    
    	        val infos = f._2.split(",")
    
    	        (f._1,infos(0),infos(1),infos(2),infos(3),infos(4))
    
    	    })
    
    	    .toDF("companyno","timestamp","price","bidprice","sellpirce","avgprice")
    
    	    .selectExpr(
    
    	        "CAST(companyno AS STRING)",
    
    	        "CAST(timestamp AS TIMESTAMP[DF1] )",
    
    	        "CAST(price AS DOUBLE)",
    
    	        "CAST(bidprice AS DOUBLE)",
    
    	        "CAST(sellpirce AS DOUBLE)",
    
    	        "CAST(avgprice AS DOUBLE)")
    
    	    .as[(String,Timestamp,Double,Double,Double,Double)]
    
    	    //设定水位
    
    	    .withWatermark("timestamp", waterThreshold)
    
    	    .groupBy(
    
    	          window($"timestamp", 
    
    	              windowDuration), 
    
    	              $"companyno")
    
    	    //求出最高价、最低价和收盘价,其中收盘价需要自己开发UDAF
    
    	    .agg(
    
    	          max(col("price")).as("max_price"),
    
    	          min(col("price")).as("min_price"),
    
    	          ClosePriceUDAF(col("price").as("latest_price")))
    
    	    .writeStream
    
    	    .outputMode("append")
    
    	    .trigger(Trigger.ProcessingTime(triggerTime))
    
    	    //输出到HBase中
    
    	    .foreach(HBaseWriter)
    
    	    .start()
    
    	    .awaitTermination()
    
    	
    
    	  }
    
    	}
    
    
    
    
  • 相关阅读:
    UVa 531 Compromise
    UVa 10130 SuperSale
    UVa 624 CD
    2015年第一天有感
    Bootstrap3.0学习(一)
    IIS上.net注册
    11g Oracle导出表 默认不导出数据为空的表解决
    Oracle数据库密码重置、导入导出库命令
    每天进步一点--WCF学习笔记
    C#每天进步一点--异步编程模式
  • 原文地址:https://www.cnblogs.com/weijiqian/p/14341981.html
Copyright © 2011-2022 走看看