zoukankan      html  css  js  c++  java
  • python通过kylin的api调度cube

    #!/usr/local/python/bin/python
    # coding=utf-8
    
    import sys
    import os
    import datetime
    import time
    import pytz
    import subprocess
    
    
    # 获取并检查外部传入参数
    if (len(sys.argv)==5):
        RunType = sys.argv[1]
        bizDate1 = sys.argv[2]
        bizDate2 = sys.argv[3]
        CubeName = sys.argv[4]
        if len(bizDate1) !=8 or len(bizDate2) !=8 :
           print "业务日期传参非法*********" 
           sys.exit(1)
        # 类型支持传入2个时间参数
        if RunType == '2':
           print "kylin跑批时间为区间时间点"
           # 日期转换成10位 YYYY-MM-DD
           bizDate11 = bizDate1[0:4] +"-" + bizDate1[4:6] + "-" + bizDate1[6:8]
           StartTime = bizDate11 + " 00:00:00"
           bizDate12 = bizDate2[0:4] +"-" + bizDate2[4:6] + "-" + bizDate2[6:8]
           EndTime = bizDate12 + " 00:00:00"
           print "StartTime: " + StartTime
           print"EndTime: " + EndTime
        else:
           print "脚本传参错误,python a.py RunType StartTime EndTime CubeName 请检查!"
           sys.exit(1)
        print"CubeName: " + CubeName
    
    
    # UTC时间和本地时间相差8小时
    # 时间字符串加8小时
    def timeadd8_hour(timestring):
         # 转为时间戳
         timestamp = time.mktime(time.strptime(timestring, '%Y-%m-%d %H:%M:%S'))
         datetime_struct = datetime.datetime.fromtimestamp(timestamp)
         datetime_obj = (datetime_struct + datetime.timedelta(hours=8))
         datetime_str = datetime_obj.strftime('%Y-%m-%d %H:%M:%S')
         return datetime_str
    
    # 获取UTC时间
    def local_to_utc(local_ts, utc_format='%Y-%m-%dT%H:%MZ'):
        local_tz = pytz.timezone('Asia/Chongqing')
        local_format = "%Y-%m-%d %H:%M:%S"
        time_str = time.strftime(local_format, time.localtime(local_ts))
        dt = datetime.datetime.strptime(time_str, local_format)
        local_dt = local_tz.localize(dt, is_dst=None)
        utc_dt = local_dt.astimezone(pytz.utc)
        return utc_dt.strftime(utc_format)
    
    # UTCS时间转换为时间戳 
    def utc_to_local(utc_time_str, utc_format='%Y-%m-%dT%H:%MZ'):
        local_tz = pytz.timezone('Asia/Chongqing') 
        local_format = "%Y-%m-%d %H:%M"
        utc_dt = datetime.datetime.strptime(utc_time_str, utc_format)
        local_dt = utc_dt.replace(tzinfo=pytz.utc).astimezone(local_tz)
        return int(time.mktime(local_dt.timetuple()))
    
    
    def paraDate(StartTime,EndTime):
        StartTime = timeadd8_hour(StartTime)
        print "开始时间(+8)" + StartTime
        EndTime = timeadd8_hour(EndTime)
        print "结束时间(+8)" + EndTime
        # 跑批时间开始时间戳
        StartTimeArray=int(time.mktime(time.strptime(StartTime, '%Y-%m-%d %H:%M:%S')))
        print "跑批开始时间戳:" + str(StartTimeArray)
        StartTimeArray = local_to_utc(StartTimeArray)
        print "UTC开始:" + str(StartTimeArray)
        StartTimeArray = utc_to_local(StartTimeArray)
        print "UTC开始时间戳:" + str(StartTimeArray)
        print StartTimeArray
        # 跑批时间结束时间戳
        EndTimeArray=int(time.mktime(time.strptime(EndTime, '%Y-%m-%d %H:%M:%S')))
        print "跑批结束时间戳:" + str(EndTimeArray)
        EndTimeArray = local_to_utc(EndTimeArray)
        print "UTC结束:" + str(EndTimeArray)
        EndTimeArray = utc_to_local(EndTimeArray)
        print "UTC结束时间戳:" + str(EndTimeArray)
        print EndTimeArray
        # 请求参数
        Para_Data =  "'{" + '"' + "startTime" + '"' + ":'" + str(StartTimeArray)+ "000'," + '"' + 'endTime":' + "'" + str(EndTimeArray) + "000', " + '"' + "buildType" + '":"' + "BUILD" + '"'+ "}'"
        print "请求参数为: " + Para_Data
        return Para_Data
    
    def Curl_Put():
        Para_Data = paraDate(StartTime,EndTime)
        # 认证信息
        Authen ="-H " + '"' + "Authorization: Basic QURNSU46S1lMSU4=" + '"' + " -H 'Content-Type: application/json'"
        # Url 信息
        CurlValue = " http://xx.xx.xx.xx:7070/kylin/api/cubes/" + CubeName + "/rebuild"
        # 请求命令
        CurlPut = "curl -X PUT " + Authen.replace('
    ','') + " -d " + Para_Data + CurlValue
        print CurlPut
        return CurlPut
    
    #  发送请求
    def CrlPut(CurlPut):
        print CurlPut
        Cmd1 = subprocess.Popen(CurlPut,shell=True,stderr=subprocess.PIPE,stdout=subprocess.PIPE)
        out,err = Cmd1.communicate()
        uuid = out.split('uuid')[1].split('",')[0].replace('"','').replace(':','')
        print "uuid: " + uuid
        return uuid
    
    # 接收作业状态
    def JobStatus(uuid):
        # 认证信息
        Authen ="-H " + '"' + "Authorization: Basic QURNSU46S1lMSU4=" + '"' + " -H 'Content-Type: application/json'"
        CurlValue2 = " http://xx.xx.xx.xx:7070/kylin/api/jobs/" + uuid
        CurlGut = "curl -X GET " + Authen.replace('
    ','')  + CurlValue2
        print CurlGut
        Cmd2 = subprocess.Popen(CurlGut,shell=True,stderr=subprocess.PIPE,stdout=subprocess.PIPE)
        out,err = Cmd2.communicate()
        JobStatus = out.split('job_status')[1].split('",')[0].replace('"','').replace(':','')
        print JobStatus
        return JobStatus
    
    # 主程序
    if __name__ == "__main__":
       NUM = 0
       flag = 0
       CurlPut = Curl_Put()
       # 发送请求获取UUID
       uuid = CrlPut(CurlPut)
       while True:
           NUM += 1
           print "{1}---执行第{0}次检测执行状态。".format(NUM, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
           try:
               # 获取作业状态
               chk_result = JobStatus(uuid)
               #作业状态判断
               if chk_result == 'PENDING':
                  print '任务在加载状态中...'
                  time.sleep(10)
               elif chk_result == 'RUNNING':
                  print '任务正在执行中...'
                  time.sleep(10)
               elif chk_result == 'FINISHED':
                  print '任务已成功执行完成!'
                  break
               else:
                   print '未知状态,请检查!'
                   sys.exit(1)
           except Exception as e:
               print e.message
               sys.exit(1)

    运行参数: python exec_kylin.py 2 20200819 20200820 kylintest

    本文参考:

    http://kylin.apache.org/cn/docs/howto/howto_build_cube_with_restapi.html

    http://kylin.apache.org/cn/docs/howto/howto_use_restapi.html

  • 相关阅读:
    tensorflow学习笔记----TensorBoard讲解
    tensorflow学习笔记----tensorflow在windows的安装及TensorBoard中mnist样例
    vmware workstation环境下虚拟机固定ip的设置
    一致性协议之Paxos算法
    一致性协议之二阶段提交和三阶段提交
    红黑树
    kibana增加验证
    Linux安装nodejs和npm
    Gnu pgp加密解密
    linux记录每次登陆的历史命令
  • 原文地址:https://www.cnblogs.com/hello-wei/p/13968257.html
Copyright © 2011-2022 走看看