zoukankan      html  css  js  c++  java
  • threading使用心得

    需求:监控多个重要网站(多线程),出现访问异常重试2次,第三次开始告警。

    日志模块

    import logging
    import logging.config
    
    # 日志配置
    LOGCONF_FILENAME = "../etc/logging.conf"
    logging.config.fileConfig(LOGCONF_FILENAME)
    logger = logging.getLogger('wj')
    

     日志配置logging.conf文件(按天自动备份日志,自动删除过期日志)

    ###############################################
    [loggers]
    keys=root,wj
    
    [logger_root]
    level=DEBUG
    handlers=hand01
    
    [logger_wj]
    handlers=hand02
    qualname=wj
    propagate=0
    
    ###############################################
    [handlers]
    keys=hand01,hand02
    
    [handler_hand01]
    class=StreamHandler
    level=DEBUG
    formatter=formatter02
    args=(sys.stdout,)
    
    [handler_hand02]
    class=handlers.TimedRotatingFileHandler
    level=DEBUG
    formatter=formatter02
    args=('../log/portal_eastmonitor.log','midnight',1,30)
    
    ###############################################
    [formatters]
    keys=formatter01,formatter02
    
    [formatter_formatter01]
    format=%(asctime)s - %(process)d - %(thread)d - %(module)s - %(levelname)s - %(message)s
    datefmt=%Y-%m-%d %H:%M:%S
    
    [formatter_formatter02]
    format=%(asctime)s - %(process)d - %(thread)d - %(module)s - %(levelname)s - %(message)s
    datefmt=
    

     http访问模块

    import pycurl
    import StringIO
    import certifi
    
    
    # 初始化Pycurl类
    def initCurl():
        c = pycurl.Curl()
        c.setopt(pycurl.CONNECTTIMEOUT, 10)  # 链接超时
        c.setopt(pycurl.TIMEOUT, 15)  # 下载超时
        #c.setopt(pycurl.COOKIEFILE, "tradeh5_cookie")
        #c.setopt(pycurl.COOKIEJAR, "tradeh5_cookie")
        c.setopt(pycurl.FOLLOWLOCATION, 0)
        c.setopt(pycurl.MAXREDIRS, 5)
        c.setopt(pycurl.SSL_VERIFYPEER, 0)
        c.setopt(pycurl.SSL_VERIFYHOST, 0)
        c.setopt(pycurl.CAINFO, certifi.where())
        c.setopt(pycurl.NOSIGNAL, 1)
        return c
    
    
    # 封装了通用Http Get方法
    def GetData(curl, url):
        head = ['Accept:*/*', 'User-Agent:Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; WOW64; Trident/6.0)']
        buf = StringIO.StringIO()
        curl.setopt(pycurl.WRITEFUNCTION, buf.write)
        curl.setopt(pycurl.URL, url)
        curl.setopt(pycurl.HTTPHEADER, head)
        curl.perform()
        the_page = buf.getvalue()
        buf.close()
        return the_page
    

     主程序,url自定义网站地址

    #/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    import time
    import pycurl
    import ssl
    import os
    import sys
    import re
    import threading
    BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))  # 取绝对路径
    sys.path.append(BASE_DIR)  # 将目录加入系统环境变量,之后导入此目录下各模块不会提示找不到
    from modules.log_config import *
    from modules.http_func import *
    from functools import wraps
    
    local = threading.local()
    q = threading.Lock()
    event = threading.Event()
    
    
    def retry(MyException, tries=1, delay=10, total=8):
        """
        :param MyException: 告警内容
        :param tries: 告警重试初始值
        :param delay: 告警重试间隔
        :param total: 告警重试总数
        :return:
        """
        def deco_retry(f):
            @wraps(f)
            def f_retry(*args, **kwargs):
                mtries, mdelay, mtotal = tries, delay, total
                while mtries < mtotal + 1:
                    try:
                        return f(*args, **kwargs)
                    except MyException as (node_id, host_id, response_time, log_time, monitor_ip, pname, monitor_account, err, desc, region):
                        if mtries < 3:
                            logger.error("重试次数:%s, 错误内容:%s" % (mtries, err))
                            mtries += 1
                            time.sleep(delay)
                        elif 3 <= mtries <= 7:
                            logger.error("重试次数:%s, 错误内容:%s" % (mtries, err))
                            time.sleep(delay)
                            mtries += 1
                        else:
                            mtries += 1
                            return False
            return f_retry
        return deco_retry
    
    
    class Portal_monitor(object):
        """
        run: 主运行方法
        http_access: 访问重要站点页面
        update_mysql: 站点页面访问成功后,更新告警数据标志位,设置为恢复告警
        """
    
        def __init__(self):
            pass
    
        def run(self, url):
            self.http_access(url)
    
        @retry(Exception)
        def http_access(self, url):
            try:
                c = initCurl()
                a = threading.currentThread().getName()
                req = GetData(c, url)
                http_total_time = c.getinfo(pycurl.TOTAL_TIME)
                backstatus = c.getinfo(pycurl.HTTP_CODE)
                coasttime = str(http_total_time*1000) + 'ms'
                if not req:
                    raise Exception
                logger.debug("[%s][%s][%s]:响应时间:%s || 返回状态:%s"
                            % (portal_dict[url], url, a, coasttime, backstatus))
            except Exception as e:
                # 异常信息只保留字母和数字,特殊字符去掉
                rule = re.compile(r'[^a-zA-z0-9]')
                desc = rule.sub(' ', str(e))
                err = '[访问:%s异常]%s, %s' % (url, portal_dict[url], desc)
                logger.error(err)
                log_time = int(time.time())
                node_id = '28'
                raise Exception(node_id, sql_prefixs[url], '', log_time,
                                monitor_ip, pname, '', err, portal_dict[url], '0')
    
    
    def run():
        portal = Portal_monitor()
        threads = []
        logger.debug('*************************************************************************************************')
        for url in portal_dict:
            threads.append(threading.Thread(target=portal.run, args=(url,)))  # 创建threads数组
        for t in threads:
            t.setDaemon(True)
            t.start()
        for t in threads:
            t.join()  # 等待子线程执行完成,才去执行主线程
        logger.debug('*************************************************************************************************
    ')
    
    
    def pid():
        pid = os.getpid()
        fp = file("../var/pid", "wt")
        fp.write("%d" % pid)
        fp.close()
    
    if __name__ == '__main__':
    
        # 关闭ssl证书验证
        ssl._create_default_https_context = ssl._create_unverified_context
    
        pid()
    
        while True:
            run()
            time.sleep(30)
    

    在threading模块中,定义两种类型的锁:threading.Lock和threading.RLock。它们之间有一点细微的区别,通过比较下面两段代码来说明:

    1. import threading
    2. lock = threading.Lock() #Lock对象
    3. lock.acquire()
    4. lock.acquire() #产生了死锁。
    5. lock.release()
    6. lock.release()
    1. import threading
    2. rLock = threading.RLock() #RLock对象
    3. rLock.acquire()
    4. rLock.acquire() #在同一线程内,程序不会堵塞。
    5. rLock.release()
    6. rLock.release()

       这两种琐的主要区别是:RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。注意:如果使用RLock,那么acquire 和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的锁。 threading.Condition

       可以把Condiftion理解为一把高级的锁,它提供了比Lock,RLock更高级的功能,允许我们能够控制复杂的线程同步问题。 threadiong.Condition在内部维护一个锁对象(默认是RLock),可以在创建Condigtion对象的时候把锁对象作为参数传入。 Condition也提供了acquire,release方法,其含义与锁的acquire,release方法一致,其实它只是简单的调用内部锁对象 的对应的方法而已。Condition还提供了如下方法(特别要注意:这些方法只有在占用锁(acquire)之后才能调用,否则将会报 RuntimeError异常。):

    Condition.wait([timeout]):  

      wait方法释放内部所占用的锁,同时线程被挂起,直至接收到通知被唤醒或超时(如果提供了timeout参数的话)。当线程被唤醒并重新占有锁的时候,程序才会继续执行下去。

    Condition.notify():

      唤醒一个挂起的线程(如果存在挂起的线程)。注意:notify()方法不会释放所占用的锁。

    Condition.notify_all()
    Condition.notifyAll()

      唤醒所有挂起的线程(如果存在挂起的线程)。注意:这些方法不会释放所占用的锁。

  • 相关阅读:
    NetworkX-根据权重画图
    Matplotlib 画廊
    NetworkX-画图
    NetworkX-simple graph
    python+networkx
    AttributeError: 'module' object has no attribute 'X509_up_ref'
    python Flask post 数据 输出
    windows环境下批处理实现守护进程
    supervisor自启动
    支持高并发的IIS Web服务器常用设置
  • 原文地址:https://www.cnblogs.com/shhnwangjian/p/5481431.html
Copyright © 2011-2022 走看看