zoukankan      html  css  js  c++  java
  • Structured Streaming demo

    
    	import org.apache.spark.sql.SparkSession
    
    	import org.apache.spark.sql.functions._
    
    	import org.apache.spark.sql.types.TimestampType
    
    	import org.apache.spark.sql.streaming.Trigger
    
    	import java.sql.Timestamp
    
    	
    
    	object StockCCICompute {
    
    	  
    
    	  def main(args: Array[String]): Unit = {
    
    	    
    
    	    val spark = SparkSession
    
    	      .builder
    
    	      .appName("StockCCICompute")
    
    	      .getOrCreate()
    
    	    
    
    	    //分别设置window长度、容忍最大晚到时间和触发间隔
    
    	    val windowDuration = "30 minutes"
    
    	    val waterThreshold = "5 minutes"
    
    	    val triggerTime = "1 minutes"
    
    	
    
    	    import spark.implicits._
    
    	
    
    	    spark.readStream
    
    	    .format("kafka")
    
    	    .option("kafka.bootstrap.servers", "broker1:port1,broker2:port2")
    
    	    .option("subscribe", "stock")
    
    	    .load()
    
    	    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    
    	    .as[(String, String)]
    
    	    //解析数据
    
    	    .map(f => {
    
    	        val companyNo = f._1
    
    	        val infos = f._2.split(",")
    
    	        (f._1,infos(0),infos(1),infos(2),infos(3),infos(4))
    
    	    })
    
    	    .toDF("companyno","timestamp","price","bidprice","sellpirce","avgprice")
    
    	    .selectExpr(
    
    	        "CAST(companyno AS STRING)",
    
    	        "CAST(timestamp AS TIMESTAMP[DF1] )",
    
    	        "CAST(price AS DOUBLE)",
    
    	        "CAST(bidprice AS DOUBLE)",
    
    	        "CAST(sellpirce AS DOUBLE)",
    
    	        "CAST(avgprice AS DOUBLE)")
    
    	    .as[(String,Timestamp,Double,Double,Double,Double)]
    
    	    //设定水位
    
    	    .withWatermark("timestamp", waterThreshold)
    
    	    .groupBy(
    
    	          window($"timestamp", 
    
    	              windowDuration), 
    
    	              $"companyno")
    
    	    //求出最高价、最低价和收盘价,其中收盘价需要自己开发UDAF
    
    	    .agg(
    
    	          max(col("price")).as("max_price"),
    
    	          min(col("price")).as("min_price"),
    
    	          ClosePriceUDAF(col("price").as("latest_price")))
    
    	    .writeStream
    
    	    .outputMode("append")
    
    	    .trigger(Trigger.ProcessingTime(triggerTime))
    
    	    //输出到HBase中
    
    	    .foreach(HBaseWriter)
    
    	    .start()
    
    	    .awaitTermination()
    
    	
    
    	  }
    
    	}
    
    
    
    
  • 相关阅读:
    再论 ASP.NET 中获取客户端IP地址
    修改MariaDB 路径
    CentOS MariaDB 安装和配置
    asp.net core 使用protobuf
    Xamarin绑定微信SDK 实现分享功能
    iOS中转义后的html标签如何还原
    MvvmCross框架在XamarinForms中的使用入门
    Xamarin.Form 初学 之 服务引用-WCF服务引用
    程序员求职面试三部曲之三:快速适应新的工作环境
    程序员求职面试三部曲之二:提高面试的成功率
  • 原文地址:https://www.cnblogs.com/weijiqian/p/14341981.html
Copyright © 2011-2022 走看看