zoukankan      html  css  js  c++  java
  • python筛选关键字---error

    初始版本

    过滤错误日志,将错误日志写入过滤日志文件中;

    import os
    import json
    import logging
    import re
    
    name = '日志文件名'
    path = '日志路径'
    # 拼接要过滤日志文件的路径
    file_path = os.path.join(path, name)
    # 拼接要输出日志文件的路径
    LogPath = os.path.join(path, "输出日志名")
    # 日志输出等级(DEBUG,INFO,ERROR...)如果需要修改日志格式,加format='%(message)s'就可以改成没有info:root
    logging.basicConfig(filename=LogPath, level=logging.DEBUG) 
    # 清空日志文件内容
    file = open(LogPath, 'w').close()
    
    # 输入json格式自定义类型
    class Employee(object):
        def __init__(self, SYS_NO, O_TIME, ACTION, LOG_TYPE, RESULT, SPENT_TIME, ERROR_MSG):
            self.SYS_NO = SYS_NO
            self.O_TIME = O_TIME
            self.ACTION = ACTION
            self.LOG_TYPE = LOG_TYPE
            self.RESULT = RESULT
            self.SPENT_TIME = SPENT_TIME
            self.ERROR_MSG = ERROR_MSG
    
        def obj_json(self, obj_instance):
            return {
                'SYS_NO': obj_instance.SYS_NO,
                'O_TIME': obj_instance.O_TIME,
                'ACTION': obj_instance.ACTION,
                'LOG_TYPE': obj_instance.LOG_TYPE,
                'RESULT': obj_instance.RESULT,
                'SPENT_TIME': obj_instance.SPENT_TIME,
                'ERROR_MSG': obj_instance.ERROR_MSG,
            }
    
    # 读取日志文件,然后写入err_log文件
    def read_log(url, keyword):
        with open(file_path, 'r+', encoding='utf-8') as f:
            count = 0
            for line in f:   # 如果需要读取行号数字,改成 for (num,line) in enumerate(f):  这里就将行号获取到:num,需要展示的话,在自定义json中加入NUM对应的字段
                if keyword in line:    
                    # 输出日志切割,多符号切分
                    test1 = re.split(',| ', line)
                    time = test1[0] + ' ' + test1[1] + ':' + test1[2]
                    sy_no = None
                    act = None
                    log_type = test1[3]
                    res = test1[4]
                    spent_time = None
                    # error_msg = test1[5] + '' +test1[6]+ '' +test1[7]+ '' +test1[8]
                    # 判断字段长度,防止报错,也可以try...except...
                    if len(test1) <= 8:
                        error_msg = os.path.join(test1[5], test1[6], test1[7])
                    else:
                        error_msg = os.path.join(test1[5], test1[6], test1[7], test1[8])
                    # 调用函数,传入对应字段
                    emp = Employee(sy_no, time, act, log_type, res, spent_time, error_msg)
                    # 序列化转换为json格式
                    data = json.dumps(emp, default=emp.obj_json, ensure_ascii=False)
                    print(data)
                    # 日志输出
                    logging.info(data)
    
                count += line.count(keyword)
        return count
    
    
    sum = 0
    num = read_log(file_path, 'ERROR')
    sum += num
    print('关键字总个数: ' + str(sum))
     

    updata 版本

    新增了对文件实时监测筛选错误日志打印;

    新增对上报接口http请求调用;

    import os
    import json
    import logging
    import re
    import requests
    import time
    
    name = '日志文件名'
    path = '日志路径'
    file_path = os.path.join(path, name)
    LogPath = os.path.join(path, "输出日志名")
    logging.basicConfig(filename=LogPath, level=logging.DEBUG,format='%(message)s')
    # 向http发送请求地址
    url_post = 'http请求接口地址'
    
    file = open(LogPath, 'w').close()
    
    last_data_err_time = ''
    
    # json格式自定义类型
    class Employee(object):
        def __init__(self, SYS_NO, O_TIME, ACTION, LOG_TYPE, RESULT, SPENT_TIME, ERROR_MSG):
            self.SYS_NO = SYS_NO
            self.O_TIME = O_TIME
            self.ACTION = ACTION
            self.LOG_TYPE = LOG_TYPE
            self.RESULT = RESULT
            self.SPENT_TIME = SPENT_TIME
            self.ERROR_MSG = ERROR_MSG
    
        def obj_json(self, obj_instance):
            return {
                'SYS_NO': obj_instance.SYS_NO,
                'O_TIME': obj_instance.O_TIME,
                'ACTION': obj_instance.ACTION,
                'LOG_TYPE': obj_instance.LOG_TYPE,
                'RESULT': obj_instance.RESULT,
                'SPENT_TIME': obj_instance.SPENT_TIME,
                'ERROR_MSG': obj_instance.ERROR_MSG,
            }
    
    
    # 读取日志文件,然后写入log文件
    def read_log(url, keyword):
        with open(file_path, 'r+', encoding='utf-8') as f:
            count = 0
            for line in f:
                if keyword in line:
                    # 输出日志切割
                    test1 = re.split(',| ', line)
                    time = test1[0] + ' ' + test1[1] + ':' + test1[2]
                    sy_no = None
                    act = None
                    log_type = test1[3]
                    res = test1[4]
                    spent_time = None
                    # error_msg = test1[5] + '' +test1[6]+ '' +test1[7]+ '' +test1[8]
                    # 判断字段长度,防止报错,也可以try...except...
                    if len(test1) <= 8:
                        error_msg = os.path.join(test1[5], test1[6], test1[7])
                    elif len(test1) <= 9:
                        error_msg = os.path.join(test1[5], test1[6], test1[7], test1[8])
                    elif len(test1) <= 10:
                        error_msg = os.path.join(test1[5], test1[6], test1[7], test1[8],test1[9])
                    else:
                        error_msg = os.path.join(test1[5], test1[6], test1[7], test1[8],test1[9],test1[10])
                    # 调用函数,传入对应字段
                    emp = Employee(sy_no, time, act, log_type, res, spent_time, error_msg)
                    # 序列化转换为json格式
                    data = json.dumps(emp, default=emp.obj_json, ensure_ascii=False)
                    # print(data)
                    # res_data = requests.post(url_post, data=json.dumps(data),timeout=2)
                    # print(res_data)
                    # 日志输出
                    logging.info(data)
    
                count += line.count(keyword)
                # print(last_data_err_time)
        return count
    
    # 获取错误日志中最后一行数据
    def last_data(file_path,LogPath):
        res_err_data = read_log(file_path, 'ERROR')
        with open(LogPath,'r+') as f_out:
            out_lines = f_out.readlines()
            last_err = out_lines[-1]
    
            # 判断当前日志是否存在,如果存在则不报送
            global last_data_err_time
            if len(last_data_err_time) == 0:
                last_data_err_time = json.loads(last_err)["O_TIME"]
                return last_err
            elif len(last_data_err_time) != 0:
                if last_data_err_time != json.loads(last_err)["O_TIME"]:
                    last_data_err_time = json.loads(last_err)["O_TIME"]
                    res_data = requests.post(url_post, data=json.dumps(last_err),timeout=2)
                    return last_err
                else:
                    return '暂无新报错'
    
    while 1:
        time.sleep(600)
        res = last_data(file_path,LogPath)
        print(res)    
  • 相关阅读:
    项目数据分析师CPDA印章
    一点想法
    该减肥啦
    PMP证书到手
    Google App Engine之初体验
    转K线理论初级三
    黄小琥没那么简单
    使用webapp框架再现Hello World
    Google App Engine之介绍篇
    转股票中KDJ线的详细分析
  • 原文地址:https://www.cnblogs.com/Tang-Yuan/p/14685618.html
Copyright © 2011-2022 走看看