zoukankan      html  css  js  c++  java
  • python hive.py

    #!/usr/bin/env python
    # -- coding:utf-8 --

    import os
    import sys
    from subprocess import call

    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession

    #master = spark://spark:7077

    master = os.environ.get("SPARK_MASTER_URL")
    spark = SparkSession.builder
        .master(master)
        .appName("hive")
        .enableHiveSupport()
        .getOrCreate()

    TIMESTAMP_COLUMNS = ['created', 'date', 'create', 'time', 'launchDate']


    def refresh_model(model):
        df = spark.sql('select * from {model}'.format(model=model))
        df.show()
        first = df.first() or []
        time_columns = filter(lambda key: key in first, TIMESTAMP_COLUMNS)

        partition_column = None

        if time_columns:
            partition_column = time_columns[0]

        if 'id' in first:
            partition_column = 'id'

        if not time_columns:
            return

        spark.sql('drop table if exists {model}'.format(model=model))
        df.repartition(time_columns[0]).write.saveAsTable(model)


    def run(filePath):
        filePath = os.path.join(os.getcwd(), filePath)
        executor = None
        if 'postsql' in filePath:
            executor = '/data/spark-2.2.0-bin-hadoop2.7/bin/spark-sql'
        else:
            executor = '/data/apache-hive-2.1.1-bin/bin/hive'

        call("{} -f {}".format(filePath, executor),shell=True)

        model = os.path.splitext(os.path.basename(filePath))[0]
        if executor == 'hive':
            print('model', model)
            refresh_model(model)


    if __name__ == '__main__':
        if len(sys.argv) == 2:
            run(sys.argv[1])
        else:
            valid_dirs = ['sql', 'postsql']
            for dir in valid_dirs:
                for dirpath,dirnames,filenames in os.walk(dir):
                    for filename in filenames:
                        run(os.path.join(dirpath,filename))

    主要理解os.path.join()、os.walk()、os.getcwd()几个方法的用法,进行路径拼接。

    注意一个地方的写法:

    call("{} -f {}".format(filePath, executor),shell=True)

    当然也可以写成subprocess.call("{} -f {}".format(filePath, executor),shell=True)

    shell=True是后加上的,如果没有shell=True,call("{} -f {}".format(filePath, executor))使用pipeline创建任务执行是会报错。

    pipeline {
        agent {label 'spark' }
        stages {
            stage('hive sql'){
                steps{
                    dir('/data/sftp/huoqiu/script'){
                        sh 'python hive.py'
                    }
                }
            }
        }
    }
    执行后就会报下面的错:
    Traceback (most recent call last): File "./marp.py", line 82, in <module> programs = [ subprocess.Popen(c) for c in commands ] File "/usr/lib/python2.6/subprocess.py", line 595, in __init__ errread, errwrite) File "/usr/lib/python2.6/subprocess.py", line 1092, in _execute_child raise child_exception OSError: [Errno 2] No such file or directory
    解决放方法就是:
    call("{} -f {}".format(filePath, executor),shell=True)
    在最后加上shell=True,就不会报错,能够正确执行。

  • 相关阅读:
    导出查询结果到excle
    导出所选行为excle
    spring security LDAP获取用户信息
    spring security防御会话伪造session攻击
    Linux安装Loadrunner generator
    Centos7 安装gitlab
    kafka 安装部署
    zookeeper 搭建
    Oracle GoldenGate对接 Oracle 11g和Kafka
    suse 11 sp4 设置yast 安装源
  • 原文地址:https://www.cnblogs.com/cuishuai/p/7838707.html
Copyright © 2011-2022 走看看