需求:监控多个重要网站(多线程),出现访问异常重试2次,第三次开始告警。
日志模块
import logging import logging.config # 日志配置 LOGCONF_FILENAME = "../etc/logging.conf" logging.config.fileConfig(LOGCONF_FILENAME) logger = logging.getLogger('wj')
日志配置logging.conf文件(按天自动备份日志,自动删除过期日志)
############################################### [loggers] keys=root,wj [logger_root] level=DEBUG handlers=hand01 [logger_wj] handlers=hand02 qualname=wj propagate=0 ############################################### [handlers] keys=hand01,hand02 [handler_hand01] class=StreamHandler level=DEBUG formatter=formatter02 args=(sys.stdout,) [handler_hand02] class=handlers.TimedRotatingFileHandler level=DEBUG formatter=formatter02 args=('../log/portal_eastmonitor.log','midnight',1,30) ############################################### [formatters] keys=formatter01,formatter02 [formatter_formatter01] format=%(asctime)s - %(process)d - %(thread)d - %(module)s - %(levelname)s - %(message)s datefmt=%Y-%m-%d %H:%M:%S [formatter_formatter02] format=%(asctime)s - %(process)d - %(thread)d - %(module)s - %(levelname)s - %(message)s datefmt=
http访问模块
import pycurl import StringIO import certifi # 初始化Pycurl类 def initCurl(): c = pycurl.Curl() c.setopt(pycurl.CONNECTTIMEOUT, 10) # 链接超时 c.setopt(pycurl.TIMEOUT, 15) # 下载超时 #c.setopt(pycurl.COOKIEFILE, "tradeh5_cookie") #c.setopt(pycurl.COOKIEJAR, "tradeh5_cookie") c.setopt(pycurl.FOLLOWLOCATION, 0) c.setopt(pycurl.MAXREDIRS, 5) c.setopt(pycurl.SSL_VERIFYPEER, 0) c.setopt(pycurl.SSL_VERIFYHOST, 0) c.setopt(pycurl.CAINFO, certifi.where()) c.setopt(pycurl.NOSIGNAL, 1) return c # 封装了通用Http Get方法 def GetData(curl, url): head = ['Accept:*/*', 'User-Agent:Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; WOW64; Trident/6.0)'] buf = StringIO.StringIO() curl.setopt(pycurl.WRITEFUNCTION, buf.write) curl.setopt(pycurl.URL, url) curl.setopt(pycurl.HTTPHEADER, head) curl.perform() the_page = buf.getvalue() buf.close() return the_page
主程序,url自定义网站地址
#/usr/bin/env python # -*- coding: utf-8 -*- import time import pycurl import ssl import os import sys import re import threading BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # 取绝对路径 sys.path.append(BASE_DIR) # 将目录加入系统环境变量,之后导入此目录下各模块不会提示找不到 from modules.log_config import * from modules.http_func import * from functools import wraps local = threading.local() q = threading.Lock() event = threading.Event() def retry(MyException, tries=1, delay=10, total=8): """ :param MyException: 告警内容 :param tries: 告警重试初始值 :param delay: 告警重试间隔 :param total: 告警重试总数 :return: """ def deco_retry(f): @wraps(f) def f_retry(*args, **kwargs): mtries, mdelay, mtotal = tries, delay, total while mtries < mtotal + 1: try: return f(*args, **kwargs) except MyException as (node_id, host_id, response_time, log_time, monitor_ip, pname, monitor_account, err, desc, region): if mtries < 3: logger.error("重试次数:%s, 错误内容:%s" % (mtries, err)) mtries += 1 time.sleep(delay) elif 3 <= mtries <= 7: logger.error("重试次数:%s, 错误内容:%s" % (mtries, err)) time.sleep(delay) mtries += 1 else: mtries += 1 return False return f_retry return deco_retry class Portal_monitor(object): """ run: 主运行方法 http_access: 访问重要站点页面 update_mysql: 站点页面访问成功后,更新告警数据标志位,设置为恢复告警 """ def __init__(self): pass def run(self, url): self.http_access(url) @retry(Exception) def http_access(self, url): try: c = initCurl() a = threading.currentThread().getName() req = GetData(c, url) http_total_time = c.getinfo(pycurl.TOTAL_TIME) backstatus = c.getinfo(pycurl.HTTP_CODE) coasttime = str(http_total_time*1000) + 'ms' if not req: raise Exception logger.debug("[%s][%s][%s]:响应时间:%s || 返回状态:%s" % (portal_dict[url], url, a, coasttime, backstatus)) except Exception as e: # 异常信息只保留字母和数字,特殊字符去掉 rule = re.compile(r'[^a-zA-z0-9]') desc = rule.sub(' ', str(e)) err = '[访问:%s异常]%s, %s' % (url, portal_dict[url], desc) logger.error(err) log_time = int(time.time()) node_id = '28' raise Exception(node_id, sql_prefixs[url], '', log_time, monitor_ip, pname, '', err, portal_dict[url], '0') def run(): portal = Portal_monitor() threads = [] logger.debug('*************************************************************************************************') for url in portal_dict: threads.append(threading.Thread(target=portal.run, args=(url,))) # 创建threads数组 for t in threads: t.setDaemon(True) t.start() for t in threads: t.join() # 等待子线程执行完成,才去执行主线程 logger.debug('************************************************************************************************* ') def pid(): pid = os.getpid() fp = file("../var/pid", "wt") fp.write("%d" % pid) fp.close() if __name__ == '__main__': # 关闭ssl证书验证 ssl._create_default_https_context = ssl._create_unverified_context pid() while True: run() time.sleep(30)
在threading模块中,定义两种类型的锁:threading.Lock和threading.RLock。它们之间有一点细微的区别,通过比较下面两段代码来说明:
- import threading
- lock = threading.Lock() #Lock对象
- lock.acquire()
- lock.acquire() #产生了死锁。
- lock.release()
- lock.release()
- import threading
- rLock = threading.RLock() #RLock对象
- rLock.acquire()
- rLock.acquire() #在同一线程内,程序不会堵塞。
- rLock.release()
- rLock.release()
这两种琐的主要区别是:RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。注意:如果使用RLock,那么acquire 和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的锁。 threading.Condition
可以把Condiftion理解为一把高级的锁,它提供了比Lock,RLock更高级的功能,允许我们能够控制复杂的线程同步问题。 threadiong.Condition在内部维护一个锁对象(默认是RLock),可以在创建Condigtion对象的时候把锁对象作为参数传入。 Condition也提供了acquire,release方法,其含义与锁的acquire,release方法一致,其实它只是简单的调用内部锁对象 的对应的方法而已。Condition还提供了如下方法(特别要注意:这些方法只有在占用锁(acquire)之后才能调用,否则将会报 RuntimeError异常。):
Condition.wait([timeout]):
wait方法释放内部所占用的锁,同时线程被挂起,直至接收到通知被唤醒或超时(如果提供了timeout参数的话)。当线程被唤醒并重新占有锁的时候,程序才会继续执行下去。
Condition.notify():
唤醒一个挂起的线程(如果存在挂起的线程)。注意:notify()方法不会释放所占用的锁。
Condition.notify_all()
Condition.notifyAll()
唤醒所有挂起的线程(如果存在挂起的线程)。注意:这些方法不会释放所占用的锁。