zoukankan      html  css  js  c++  java
  • python3检测ossfs可用性+钉钉通知

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 2019-12-02 15:16
    # @Author  : Anthony
    # @Email   : ianghont7@163.com
    # @File    : check-ossfs.py
    
    # 检测ossfs进程是否存在
    # 检测/xxxx/file是否挂载成功
    # ossfs可用性检测
    
    import psutil
    import requests
    import json
    import subprocess
    import threading
    
    class DingTalk_Base:
        def __init__(self):
            self.__headers = {'Content-Type': 'application/json;charset=utf-8'}
            self.url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxxx'
        def send_msg(self,text):
            json_text = {
                "msgtype": "text",
                "text": {
                    "content": text
                },
                "at": {
                    "atMobiles": [
                        ""
                    ],
                    "isAtAll": False
                }
            }
            return requests.post(self.url, json.dumps(json_text), headers=self.__headers).content
    class DingTalk_Disaster(DingTalk_Base):
        def __init__(self):
            super().__init__()
            # 填写机器人的url
            self.url = "https://oapi.dingtalk.com/robot/send?access_token=xxxxx"
    
    def check_oss_pid():
        pids = psutil.pids()
        for pid in pids:
            p = psutil.Process(pid)
            if "ossfs" == p.name().strip():
                return True
        else:
            return ding.send_msg("""ossfs报警通知:
    机器:机器名称
    消息内容:ossfs进程不存在,请及时处理...""")
    
    
    
    def get_ossfs_name():
        try:
            args = "ls -lh /alidata/|grep file|awk '{print $3}'"
            running_shell = subprocess.Popen(args,
                                             shell=True,
                                             stdin=subprocess.PIPE,
                                             stdout=subprocess.PIPE,
                                             stderr=subprocess.PIPE,)
            out,err = running_shell.communicate()
            for line in out.splitlines():
                s1 = bytes.decode(line).strip()
                if s1.strip() != "root":
                    return ding.send_msg("""ossfs报警通知:
    机器:语文课堂
    消息内容:/xxxx/file/挂载失败,请及时处理...""")
        except Exception as e:
            print(e)
    
    if __name__ == "__main__":
        ding = DingTalk_Disaster()
        threads = [threading.Thread(target=get_ossfs_name),
                   threading.Thread(target=check_oss_pid)]
        for t in threads:
            t.start()
  • 相关阅读:
    The Snail
    Oil Deposits
    杭电3784(继续xxx定律)
    poj 2395 Out of Hay
    poj 2485 Highways(简单题)
    poj 2560 || 杭电1162
    Rescue
    “中国芯”能抗衡英特尔吗?
    2013,中国计算巨头放眼国际市场
    123063天两度瘫痪:为啥不在淘宝上卖火车票?
  • 原文地址:https://www.cnblogs.com/ipyanthony/p/11989214.html
Copyright © 2011-2022 走看看