zoukankan      html  css  js  c++  java
  • 日志分析(非Class封装,非数据库处理)

    项目:
    概述:生产中户生成大量的系统日志,应用程序日组织,安全日志等,通过对日志的分析,可以了解服务器的负载,健康状况,可以分析客户的分布情况,客户户的行为,甚至基于浙西分析可以做出预测。
      一般采集流程:
        日志产出--->采集(Logstash,Flime,Scribe)--->存储——》分析-》存储(数据库,NoSQL)-》可视化
        注意的是:第一次分析,或者每次分析,尽量保持数据的完整性,把当前层需要的数据自己拿走即可,避免后序分析缺失数据。
      
      1、数据提取:
        日志是半结构化数据,是有组织的是有格式的数据,可以分割成行和列,就可以当做表理解和处理了,当然也可以分析里边的数据
            半结构化数据:例如html, xml具有一定的格式,数据处理后,可以被理解的,html一般用作数据结构,xml一般用作数据交换使用
            结构化数据:能按照行和列组织起来的数据,如MySQL,NoSQL
            非结构化数据:MP3,图片,视频(不可以用文本信息理解的数据)
        文本分析:
          日志是文本文件,需要依赖文件IO,字符串操作,正则表达式技术。
     1 s = '119.123.183.219 - - [06/Apr/2017:20:59:39 +0800]
     2  "GET /favicon.ico HTTP/1.1" 200 4101 "-" 
     3  "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"'
     4 
     5 def foo(s):
     6     alpha = set("""'" []""")
     7     start = 0
     8     flag = False # 开关遇到 []  "  "
     9 
    10     for i, c in enumerate(s):
    11         if c in alpha:
    12             if c == '[':
    13                 flag = True
    14                 start = i + 1
    15             elif c == ']':
    16                 flag = False
    17             elif c == '"':
    18                 flag = not flag
    19                 if flag:
    20                     start = i + 1
    21             if flag:
    22                 continue
    23             if start == i:
    24                 start = i + 1
    25                 continue
    26             yield s[start:i]
    27             start = i + 1
    28     else:
    29         if start < len(s):
    30             yield  s[start:]
    31 
    32 
    33 # print(list(foo(s)))
    使用字符串切割,效率不见得低,但是太繁琐,用来练习
    1 # line = '119.123.183.219 - - [06/Apr/2017:20:59:39 +0800]
    2 #  "GET /favicon.ico HTTP/1.1" 200 4101 "-" 
    3 # "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"'
    4 # regex = re.compile('(?P<remote>(?:d*.?){4}) - - [(?P<datetime>[^[]]*)] "(?P<method>[^"]+) '
    5 #                    '(?P<url>[^"]+) (?P<protocol>[^"]+)" (?P<status>d{3}) (?P<length>d+) "[^s]*" "(?P<useragent>[^"]*)"')
    使用正则表达式切割
        并通过分组的技术,获得对应的数据:但是基本没有改变数据样式,保证了数据的完整性。
    {'remote': '119.123.183.219', 'datetime': '06/Apr/2017:20:59:39 +0800', 'method': 'GET', 'url': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': '200', 'length': '4101', 'useragent': 'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0'}

        注:如果多个数据一一对应的时候,要考虑zip(),再对zip—————》 dict

     2、异常处理: 
        日志在记录的时候,有可能出现因为断电等,(虽然不应断电),出现日志记录不全,这些数据要做处理,目前使用的是return方法,事实上,是可以输出到日志,或者 数据库中,抛出异常,,让调用者获得异常自行处理

     3、数据载入:

     
          
      注:这里是每次处理一个日志文件


      4、滑动窗口:
        时间窗口分析:
        概念:很多数据,例如日志都是和时间相关的都是按照时间顺序产生的,产生的数据分析的时候,要按照时间求值。
          interval:表示每一次求值的时间间隔
          width:时间窗口宽度,指的一次求值的时间窗口宽度。
          当width > interval:
          


          当width = interval
            


          当width < interval:一般不采用,会有数据缺失
        


       5、数据分发:   
    消息的产生和消息的消要有一定的平衡

    消费的速度要略微大于生产的速度,否则最终会产生数据堆积

    生产 者与 消费者 解耦,不见面,好处之一,可以产生大量数据,以后在处理,但是还是消费力大于生产力,另外,程序解耦是有好处的!
    ---消息队列,用来解耦,不让两者直接见面

    多个消费者拉数据:
    1、都从1开始拉
    2、依次1,2,3结束 再4,5,6.。。
    但是如果消费速度不同的话,可以使用多个消息队列,各自用各自的

            但是如图这样的分发机制,也是有风险的,如果每个消费者需要的数据要一模一样,那就必须创建多条消息队列,每个人一条队列,各自消费各自的。避免争抢。

        如何分发:
          一对多的消息队列,轮询策略,一个数据通过分发器,发送到n个消费者。
        如何注册:
          在调度函数内部记录有哪些消费者,每一个消费者拥有自己的队列。
        多线程:
          由于一条数据会被多个不同的注册过的handler处理,所以最好的方式是多线程。
          


         不同的handler函数:根据需求提供不同的handler



     

    6、状态码
    304:如果某个页面比较多的304 ,表示缓存明显,大大减少服务器的压力,可以服务更多的用户
    如果出现502 是上游服务器挂了,404则是链接错误,网页错误,或者大量出现404,可能有人在嗅探网站资源



    7、浏览器分析:
    useragent:
    传递一个字符串,通过useragent,还可以知道对方浏览器的版本

    获取浏览器的版本很重要,可以根据版本确定返回一些信息,如:html4,还是5 写的页面

    8、可视化:这里使用的是pandas模块,具体使用在:
    1 https://mp.weixin.qq.com/s/LRxnIFRNIF2pqD8d4xRIEw
    2 https://mp.weixin.qq.com/s?__biz=MjM5NzU0MzU0Nw==&mid=2651379888&idx=2&sn=bcfeab20d0230e34781660eee81431f8&chksm=bd2413a48a539ab27a42378a893c24927d0b85e325f81d7708ce8c43c6bbbb3f471437507d3e&scene=0#rd
      



       9、完整代码:

      1 '''
      2 关于log日志的分析:
      3     1、将日志每一段数据提取出来,将状态信息,时间做格式化处理
      4     2、统计每个浏览器使用次数
      5     3、统计状态信息出现次数
      6     
      7     附加:
      8         将统计得到的信息,使用相应模块做出图标显示。
      9 
     10 '''
     11 
     12 import re
     13 import datetime
     14 from pathlib import Path
     15 import threading
     16 from queue import Queue
     17 from user_agents import parse
     18 
     19 # line = '119.123.183.219 - - [06/Apr/2017:20:59:39 +0800]
     20 #  "GET /favicon.ico HTTP/1.1" 200 4101 "-" 
     21 # "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"'
     22 
     23 # 对每行日志信息的提取(全部信息提取,保留所有可能以后还有可能用到的)
     24 regex = re.compile('(?P<ipaddr>[d.]{7,15}) - - [(?P<datetime>[^[]]*)] '
     25                    '"(?P<method>[^s]*) (?P<url>[^s]*) (?P<protocol>[^s]*)" (?P<status>d+) (?P<size>d+) [^s]+ "(?P<useragent>[^"]*)"')
     26 
     27 # 对提取的信息做一些数据类型的转换
     28 # 如果有需要转换的就调用相应的函数,没有,则是什么返回什么,只不过这里使用了 匿名函数做一个中间件
     29 ops = {
     30     'datetime':lambda timestr:datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
     31     'status':int,
     32     'size':int
     33 }
     34 
     35 def extrat(line:str):
     36     macher = regex.match(line)
     37     if macher:
     38         return  {k:ops.get(k, lambda x:x)(v) for k, v in macher.groupdict().items()}
     39     else:
     40         return line
     41 
     42 # 获取数据流
     43 ## 对单个文件的处理
     44 def loadfile(filename:str, encoding='utf-8'):
     45     with open('test.log', encoding=encoding) as f:
     46         for line in f:
     47             # 因为获取的信息如果某一时刻,遇到停电等突发情况,日志记录不全,这时候的信息,记录到 数据库,或者其他地方
     48             fileds = extrat(line)
     49             if isinstance(fileds, dict):
     50                 yield fileds
     51             else:
     52                 #还没有用数据库,这里简单的处理一下
     53                 return fileds
     54 
     55 ## 一般情况不会只有一个日志文件,一般都是一个日志路径,获取相应路径下的所有的日志文件
     56 def load(*path, e='*.log', r=False, encoding='utf-8'):
     57     for p in path:
     58         path = Path(p)
     59         if path.is_dir():
     60             logs = path.rglob(e) if r else path.glob(e)
     61             for filename in logs:
     62                 yield from loadfile(filename, encoding=encoding)
     63         else:
     64             yield from loadfile(p, encoding=encoding)
     65 
     66 # 窗口函数:用来一段时间获取日志信息
     67 def window(q:Queue, width, iterval, handler):
     68     start = datetime.datetime.strptime('19700101 00:00:01 +0800', '%Y%m%d %H:%M:%S %z')
     69     current = datetime.datetime.strptime('19700101 00:00:01 +0800', '%Y%m%d %H:%M:%S %z')
     70     buf = []
     71     delta = datetime.timedelta(seconds=width-iterval)
     72     while True:
     73         data = q.get() # next(iterable)
     74         if data:
     75             buf.append(data)
     76             current = data['datetime']
     77         if (current - start).total_seconds() > iterval:
     78             # print(buf) # [{'ipaddr': '123.125.71.36', 'datetime': datetime.datetime(2017, 4, 6, 18, 9, 25, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'method': 'GET', 'url': '/', 'protocol': 'HTTP/1.1', 'status': 200, 'size': 8642, 'useragent': 'Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)'}]
     79             print('---------')
     80             ret = handler(buf)
     81             print(ret)
     82             start = current
     83             buf = [x for x in buf if x['datetime'] > current - delta]
     84 
     85 # status状态码的统计:并计算出现频率
     86 def status_handler(iterable:list):
     87     status = {}
     88     for st in iterable:
     89         key = st['status']
     90         status[key] = status.get(key, 0) + 1
     91     total = len(iterable)
     92     return { k:v/total for k, v in status.items()}
     93 
     94 # 浏览器版本的统计
     95 allbrowser = {}# 到目前为止,浏览器使用的次数,全局的
     96 def browser_handler(iterable:list):
     97     browser = {}
     98     for wb in iterable:
     99         uastr = wb['useragent']
    100         ua = parse(uastr)
    101         key = ua.browser.family, ua.browser.version_string
    102         browser[key] = browser.get(key, 0) + 1
    103         allbrowser[key] = allbrowser.get(key, 0) + 1
    104     return browser
    105 
    106 # 分发器:
    107 def disparcher(src):
    108     queues = []
    109     threats = []
    110     def reg(width, iterval, handler):
    111         q = Queue()
    112         t = threading.Thread(target=window, args=(q, width, iterval, handler))
    113         queues.append(q)
    114         threats.append(t)
    115     def run():
    116         for t in threats:
    117             t.start()
    118         while True:
    119             for data in src:
    120                 for q in queues:
    121                     q.put(data)
    122             while True:
    123             # 假设某一时刻,要取出 allbrowser 进行分析
    124                 import pandas
    125                 cmd = input('>>>')
    126                 if cmd == 'plot':
    127                     # print(allbrowser)
    128                     # print(sorted(allbrowser.items(), key=lambda item:item[1], reverse=True))
    129                     newdict  = {}
    130                     for (k,ver),val in allbrowser.items():
    131                         newdict[k] = newdict.get(k, 0) + val
    132                     from pandas import Series
    133                     import numpy
    134                     import matplotlib.pyplot as plt
    135 
    136                     cmd = input(">>>>----->>>>")
    137                     if cmd == 'plot1':
    138                         s = Series(newdict)
    139                         s1 = s.sort_values(ascending=False)
    140                         print(s1)
    141                         s1.plot()
    142                         plt.show()
    143                     if cmd == 'plot2':
    144                         s = Series(newdict)
    145                         print(s)
    146                         plt.title('browser')
    147                         plt.plot(s, color='red')
    148                         plt.show()
    149     return reg, run
    150 
    151 if __name__ == '__main__':
    152     src = load('.')
    153     reg, run = disparcher(src)
    154     reg(10, 5, browser_handler)
    155     # reg(10, 5, status_handler)
    156     run()
    完整代码
    1 183.69.210.164 - - [07/Apr/2017:09:33:07 +0800] "GET /app/template/default//images/yun-sj.png HTTP/1.1" 200 362 "http://job.magedu.com/app/template/default//style/css.css" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"
    test.log内容样例
      10、总结:
        通过这样项目可以把基础部分做一定的练习,
      1. 首先,获取数据类型,利用正则表达式,进行match匹配,这里使用编译的方式,因为对进来的每一行都要做处理,所以,编译一次,以后可以直接使用
      2. 对正则表达式 命名分组的练习,以及groupdict的使用
      3. 对文件的处理,这里使用了内建函数,以及pathlib模块下的Path类,对文件对象的操作是很重要的,所以这里涉及到了,对单个文件对象的使用,其次就是文件路径的操作,文件路径的操作,主要涉及到了文件类型的判断,以及对目录的递归,
      4. 这里实际上没有用到显示的目录递归,只使用了 通配符递归,并且设置了 是否需要递归匹配的措施,r,根据具体的情况具体分析。
      5. 时间滑动窗口的使用,时间窗口是一个死循环,(目前认为),因为要不断的输出信息。所以设计到了时间问题,所以用到了datetime模块。
      6. 使用了队列和线程,对数据分发的措施,实现同样的数据分给不同的线程,实现不同的统计即 代码中的handler,每注册一个handler,将给其一个queue,并且分配一个线程。函数的调用跟线程相关,所以互不干涉,在各自的线程中,压栈,弹出。
      7. 整个程序的运行时通过最后的一个调度函数实现的,首先是注册,然后,通过轮询的机制,一次启动每个线程,同时,将数据也轮询的分发给 queue对象。
      8. 注意:这里都是用的死循环,因为服务器日志不停的产生,所以这边要不停的处理分析。
          

























    为什么要坚持,想一想当初!
  • 相关阅读:
    Redis缓存穿透,缓存击穿,缓存雪崩
    Redis持久化机制
    Docker小白到实战之常用命令演示,通俗易懂
    分布式事务最终一致性-CAP框架轻松搞定
    gRPC四种模式、认证和授权实战演示,必赞~~~
    Docker小白到实战之开篇概述
    郑州 | 7月20日,想想都后怕
    避不开的分布式事务
    c++实现十大经典排序算法
    浏览器缓存机制总结
  • 原文地址:https://www.cnblogs.com/JerryZao/p/9635440.html
Copyright © 2011-2022 走看看