zoukankan      html  css  js  c++  java
  • 实现维度表自动刷新的一种方式

    是采用的将更新的维度表放在最新的分区的形式。

    # coding=utf-8
    
    from pyspark.sql.types import IntegerType, StructType
    from pyspark.sql import SparkSession
    import datetime
    from trancarelib.db.hive_util import HiveWrapper
    
    print('--->begin')
    print('程序开始时间:', datetime.datetime.now())
    spark = SparkSession.builder. 
        appName("study_structured_streaming"). 
        enableHiveSupport(). 
        config("spark.debug.maxToStringFields", "100"). 
        getOrCreate()
    hivewrapper = HiveWrapper(spark)
    #-----------------------------------------------------------------------------------------------------------------------
    Schema_dim = StructType().add("id", IntegerType(), True)
    Schema_fact = StructType().add("id", IntegerType(), True).add("cnt", IntegerType(),True)
    df_dim = spark.sql("select * from test.t_20210708_dim where par=(select max(par) from test.t_20210708_dim)" ).createOrReplaceTempView('t_dim')
    df_join = spark.sql("select * from test.t_20210708_join where par=(select max(par) from test.t_20210708_join)" ).createOrReplaceTempView('t_join')
    df = spark.readStream.schema(Schema_fact).orc("s3://transsion-sc/user/hive/warehouse/test.db/t_20210708/").createOrReplaceTempView('t0')
    df1 = spark.sql(
        "select t0.id as id,t_dim.par as t_dim_par,t_join.par as t_join_par"
        " from t0 "
        " join t_dim on t0.id=t_dim.id"
        " join t_join on t0.cnt=t_join.id"
    )
    df1.writeStream.outputMode("append").format("console").option("truncate", False).start().awaitTermination()
    

      

  • 相关阅读:
    jQuery插件开发全解析(转)
    isMemberOfClass和isKindOfClass之间区别
    NSArray,NSSet,NSDictionary总结 (转)
    NSIndexPath(转)
    Maven在dos窗口中的命令
    JPA概要
    fedora的输入法
    iQQ 学习笔记2 :借助新浪微博输入验证码、远程控制退出
    使用EXCEL制作通用打印模块
    字符串加密解密方法
  • 原文地址:https://www.cnblogs.com/muyue123/p/15016711.html
Copyright © 2011-2022 走看看