zoukankan      html  css  js  c++  java
  • yarn application -kill application_id yarn kill 超时任务脚本

    需求:kill 掉yarn上超时的任务,实现不同队列不同超时时间的kill机制,并带有任务名的白名单功能

    此为python脚本,可配置crontab使用

    # _*_ coding=utf-8 _*_
    # !/usr/bin/python
    import re
    import commands
    import time
    
    run_app_arr = []
    timeout_app_arr = []
    ONE_HOURE = 1
    THREE_HOURE = 3
    TEST_QUEUE_NAME = ['hue', 'etl-test']
    ONLINE_QUEUE_NAME = ['default']
    KILL_WHITE_LIST = ['org.apache.spark.sql.hive.thriftserver.HiveThriftServer2']
    DINGDING_URL = 'xxx'
    ding_cmd = """ curl %s -H 'Content-Type: application/json' -d '{"msgtype": "text", "text": {"content": "== YARN OVERTIME JOB KILL 告警 ==
    
     当前时间: %s 
     kill_app_id: %s 
     kill_app_name: %s 
     kill_app_queue: %s "}}' """
    f = None
    try:
        f = open('/home/hadoop/autokillhadoopjob/check_timeout_job.log', 'a')
        commond = '. /etc/profile && yarn application -list | grep "http://" |grep "RUNNING" |cut -f1,2,5'
        # 获得正在运行job的id,name,queue 加到 run_app_arr
        status, output = commands.getstatusoutput(commond)
        f.write('#' * 50 + '
    ')
        f.write('=> start_time: %s 
    ' % (time.strftime('%Y-%m-%d %H:%M:%S')))
        if status == 0 :
            for line in output.split('
    '):
                if line.startswith('application_'):
                    app_line = re.split('	', line)
                    running_app_id = app_line[0].strip()
                    running_app_name = app_line[1].strip()
                    app_queue = app_line[2].strip()
                    # 根据所在队列 筛选出app加到数组中
                    if app_queue in TEST_QUEUE_NAME or app_queue in ONLINE_QUEUE_NAME:
                        run_app_arr.append((running_app_id, running_app_name, app_queue))
        else:
            f.write('yarn -list 执行失败. status: %s.'%(status))
    
        # 遍历所有队列的running job,如有超时加到timeout_app_arr
        for run_app in run_app_arr:
            running_app_id = run_app[0]
            running_app_name = run_app[1]
            running_app_queue = run_app[2]
            commond = ". /etc/profile && yarn application -status " + running_app_id + "| grep 'Start-Time' | awk -F ':' '{print $2}'"
            status, output = commands.getstatusoutput(commond)
            if status == 0:
                for line in output.split('
    '):
                    start_timestamp = line.strip()
                    if start_timestamp.isdigit():
                        # 计算任务耗时
                        elapsed_time = time.time() - int(start_timestamp) / 1000
                        cost_time = round(elapsed_time / 60 / 60, 2)
                        f.write('=> cost_time: %sh 
    ' % (cost_time))
                        # print cost_hour
                        # 筛选出超时的job 加到数据组中/过滤掉白名单任务
                        if running_app_name not in KILL_WHITE_LIST:
                            if (running_app_queue in TEST_QUEUE_NAME and cost_time > ONE_HOURE) 
                                    or (running_app_queue in ONLINE_QUEUE_NAME and cost_time > THREE_HOURE):
                                # if cost_hour > 0:# 测试
                                f.write('=> timeout app => %s # %s # %s
    ' % (running_app_id, running_app_name, running_app_queue))
                                timeout_app_arr.append((running_app_id, running_app_name, running_app_queue))
            else:
                f.write('yarn -status 执行失败. status: %s.'%(status))
    
        if len(timeout_app_arr) == 0:
            f.write('=> no timeout job.
    ')
    
        # kill掉超时的job 并dingding报警
        for kill_app in timeout_app_arr:
            kill_app_id = kill_app[0]
            kill_app_name = kill_app[1]
            kill_app_queue = kill_app[2]
            commond = '. /etc/profile && yarn application -kill ' + kill_app_id
            status, output = commands.getstatusoutput(commond)
            if status == 0:
                f.write('=> kill app sucessfully: %s # %s # %s.
    ' % (kill_app_id, kill_app_name, kill_app_queue))
                current_time = time.strftime('%Y-%m-%d %H:%M:%S')
                cmd = ding_cmd % (DINGDING_URL, current_time, kill_app_id, kill_app_name, kill_app_queue)
                commands.getstatusoutput(cmd)
            else:
                f.write('=> kill app failed: %s # %s # %s.
    ' % (kill_app_id, kill_app_name, kill_app_queue))
    
        f.write('=> stop_time: %s 
    ' % (time.strftime('%Y-%m-%d %H:%M:%S')))
    
    except Exception as e:
        f.write('=> Exception: %s 
    ' % (e.message))
    finally:
        if f:
            f.close()
    
    
  • 相关阅读:
    Python装饰器的解包装(unwrap)
    《Python cookbook》 “定义一个属性可由用户修改的装饰器” 笔记
    关于Python的函数(Method)与方法(Function)
    判断python对象是否可调用的三种方式及其区别
    tornado返回指定的http code
    Mac下安装pymssql
    iptables
    OpenFlow通信流程解读
    Prometheus的架构及持久化
    ansible总结
  • 原文地址:https://www.cnblogs.com/jiangxiaoxian/p/9648545.html
Copyright © 2011-2022 走看看