zoukankan      html  css  js  c++  java
  • 实现维度表自动刷新的一种方式

    是采用的将更新的维度表放在最新的分区的形式。

    # coding=utf-8
    
    from pyspark.sql.types import IntegerType, StructType
    from pyspark.sql import SparkSession
    import datetime
    from trancarelib.db.hive_util import HiveWrapper
    
    print('--->begin')
    print('程序开始时间:', datetime.datetime.now())
    spark = SparkSession.builder. 
        appName("study_structured_streaming"). 
        enableHiveSupport(). 
        config("spark.debug.maxToStringFields", "100"). 
        getOrCreate()
    hivewrapper = HiveWrapper(spark)
    #-----------------------------------------------------------------------------------------------------------------------
    Schema_dim = StructType().add("id", IntegerType(), True)
    Schema_fact = StructType().add("id", IntegerType(), True).add("cnt", IntegerType(),True)
    df_dim = spark.sql("select * from test.t_20210708_dim where par=(select max(par) from test.t_20210708_dim)" ).createOrReplaceTempView('t_dim')
    df_join = spark.sql("select * from test.t_20210708_join where par=(select max(par) from test.t_20210708_join)" ).createOrReplaceTempView('t_join')
    df = spark.readStream.schema(Schema_fact).orc("s3://transsion-sc/user/hive/warehouse/test.db/t_20210708/").createOrReplaceTempView('t0')
    df1 = spark.sql(
        "select t0.id as id,t_dim.par as t_dim_par,t_join.par as t_join_par"
        " from t0 "
        " join t_dim on t0.id=t_dim.id"
        " join t_join on t0.cnt=t_join.id"
    )
    df1.writeStream.outputMode("append").format("console").option("truncate", False).start().awaitTermination()
    

      

  • 相关阅读:
    CSS3的box-sizing属性
    html5 --基础笔记2
    html5--基础笔记
    CSS3--阴影,渐变,背景图片
    响应式布局--流式布局
    angular中的this指向问题
    angular中控制器之间的通讯方式
    angular中的$http配置和参数
    console
    h5表单验证的css和js方法
  • 原文地址:https://www.cnblogs.com/muyue123/p/15016711.html
Copyright © 2011-2022 走看看