项目:
概述:生产中户生成大量的系统日志,应用程序日组织,安全日志等,通过对日志的分析,可以了解服务器的负载,健康状况,可以分析客户的分布情况,客户户的行为,甚至基于浙西分析可以做出预测。
一般采集流程:
日志产出--->采集(Logstash,Flime,Scribe)--->存储——》分析-》存储(数据库,NoSQL)-》可视化
注意的是:第一次分析,或者每次分析,尽量保持数据的完整性,把当前层需要的数据自己拿走即可,避免后序分析缺失数据。
1、数据提取:
日志是半结构化数据,是有组织的是有格式的数据,可以分割成行和列,就可以当做表理解和处理了,当然也可以分析里边的数据
半结构化数据:例如html, xml具有一定的格式,数据处理后,可以被理解的,html一般用作数据结构,xml一般用作数据交换使用
结构化数据:能按照行和列组织起来的数据,如MySQL,NoSQL
非结构化数据:MP3,图片,视频(不可以用文本信息理解的数据)
文本分析:
日志是文本文件,需要依赖文件IO,字符串操作,正则表达式技术。
1 s = '119.123.183.219 - - [06/Apr/2017:20:59:39 +0800] 2 "GET /favicon.ico HTTP/1.1" 200 4101 "-" 3 "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"' 4 5 def foo(s): 6 alpha = set("""'" []""") 7 start = 0 8 flag = False # 开关遇到 [] " " 9 10 for i, c in enumerate(s): 11 if c in alpha: 12 if c == '[': 13 flag = True 14 start = i + 1 15 elif c == ']': 16 flag = False 17 elif c == '"': 18 flag = not flag 19 if flag: 20 start = i + 1 21 if flag: 22 continue 23 if start == i: 24 start = i + 1 25 continue 26 yield s[start:i] 27 start = i + 1 28 else: 29 if start < len(s): 30 yield s[start:] 31 32 33 # print(list(foo(s)))
1 # line = '119.123.183.219 - - [06/Apr/2017:20:59:39 +0800] 2 # "GET /favicon.ico HTTP/1.1" 200 4101 "-" 3 # "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"' 4 # regex = re.compile('(?P<remote>(?:d*.?){4}) - - [(?P<datetime>[^[]]*)] "(?P<method>[^"]+) ' 5 # '(?P<url>[^"]+) (?P<protocol>[^"]+)" (?P<status>d{3}) (?P<length>d+) "[^s]*" "(?P<useragent>[^"]*)"')
并通过分组的技术,获得对应的数据:但是基本没有改变数据样式,保证了数据的完整性。
{'remote': '119.123.183.219', 'datetime': '06/Apr/2017:20:59:39 +0800', 'method': 'GET', 'url': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': '200', 'length': '4101', 'useragent': 'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0'}
注:如果多个数据一一对应的时候,要考虑zip(),再对zip—————》 dict
2、异常处理:
日志在记录的时候,有可能出现因为断电等,(虽然不应断电),出现日志记录不全,这些数据要做处理,目前使用的是return方法,事实上,是可以输出到日志,或者 数据库中,抛出异常,,让调用者获得异常自行处理
3、数据载入:
注:这里是每次处理一个日志文件
4、滑动窗口:
时间窗口分析:
概念:很多数据,例如日志都是和时间相关的都是按照时间顺序产生的,产生的数据分析的时候,要按照时间求值。
interval:表示每一次求值的时间间隔
width:时间窗口宽度,指的一次求值的时间窗口宽度。
当width > interval:
当width = interval
当width < interval:一般不采用,会有数据缺失
5、数据分发:
消息的产生和消息的消要有一定的平衡
消费的速度要略微大于生产的速度,否则最终会产生数据堆积
生产 者与 消费者 解耦,不见面,好处之一,可以产生大量数据,以后在处理,但是还是消费力大于生产力,另外,程序解耦是有好处的!
---消息队列,用来解耦,不让两者直接见面
多个消费者拉数据:
1、都从1开始拉
2、依次1,2,3结束 再4,5,6.。。
但是如果消费速度不同的话,可以使用多个消息队列,各自用各自的
但是如图这样的分发机制,也是有风险的,如果每个消费者需要的数据要一模一样,那就必须创建多条消息队列,每个人一条队列,各自消费各自的。避免争抢。
如何分发:
一对多的消息队列,轮询策略,一个数据通过分发器,发送到n个消费者。
如何注册:
在调度函数内部记录有哪些消费者,每一个消费者拥有自己的队列。
多线程:
由于一条数据会被多个不同的注册过的handler处理,所以最好的方式是多线程。
不同的handler函数:根据需求提供不同的handler
6、状态码
304:如果某个页面比较多的304 ,表示缓存明显,大大减少服务器的压力,可以服务更多的用户
如果出现502 是上游服务器挂了,404则是链接错误,网页错误,或者大量出现404,可能有人在嗅探网站资源
7、浏览器分析:
useragent:
传递一个字符串,通过useragent,还可以知道对方浏览器的版本
获取浏览器的版本很重要,可以根据版本确定返回一些信息,如:html4,还是5 写的页面
8、可视化:这里使用的是pandas模块,具体使用在:
1 https://mp.weixin.qq.com/s/LRxnIFRNIF2pqD8d4xRIEw 2 https://mp.weixin.qq.com/s?__biz=MjM5NzU0MzU0Nw==&mid=2651379888&idx=2&sn=bcfeab20d0230e34781660eee81431f8&chksm=bd2413a48a539ab27a42378a893c24927d0b85e325f81d7708ce8c43c6bbbb3f471437507d3e&scene=0#rd
9、完整代码:
1 ''' 2 关于log日志的分析: 3 1、将日志每一段数据提取出来,将状态信息,时间做格式化处理 4 2、统计每个浏览器使用次数 5 3、统计状态信息出现次数 6 7 附加: 8 将统计得到的信息,使用相应模块做出图标显示。 9 10 ''' 11 12 import re 13 import datetime 14 from pathlib import Path 15 import threading 16 from queue import Queue 17 from user_agents import parse 18 19 # line = '119.123.183.219 - - [06/Apr/2017:20:59:39 +0800] 20 # "GET /favicon.ico HTTP/1.1" 200 4101 "-" 21 # "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"' 22 23 # 对每行日志信息的提取(全部信息提取,保留所有可能以后还有可能用到的) 24 regex = re.compile('(?P<ipaddr>[d.]{7,15}) - - [(?P<datetime>[^[]]*)] ' 25 '"(?P<method>[^s]*) (?P<url>[^s]*) (?P<protocol>[^s]*)" (?P<status>d+) (?P<size>d+) [^s]+ "(?P<useragent>[^"]*)"') 26 27 # 对提取的信息做一些数据类型的转换 28 # 如果有需要转换的就调用相应的函数,没有,则是什么返回什么,只不过这里使用了 匿名函数做一个中间件 29 ops = { 30 'datetime':lambda timestr:datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'), 31 'status':int, 32 'size':int 33 } 34 35 def extrat(line:str): 36 macher = regex.match(line) 37 if macher: 38 return {k:ops.get(k, lambda x:x)(v) for k, v in macher.groupdict().items()} 39 else: 40 return line 41 42 # 获取数据流 43 ## 对单个文件的处理 44 def loadfile(filename:str, encoding='utf-8'): 45 with open('test.log', encoding=encoding) as f: 46 for line in f: 47 # 因为获取的信息如果某一时刻,遇到停电等突发情况,日志记录不全,这时候的信息,记录到 数据库,或者其他地方 48 fileds = extrat(line) 49 if isinstance(fileds, dict): 50 yield fileds 51 else: 52 #还没有用数据库,这里简单的处理一下 53 return fileds 54 55 ## 一般情况不会只有一个日志文件,一般都是一个日志路径,获取相应路径下的所有的日志文件 56 def load(*path, e='*.log', r=False, encoding='utf-8'): 57 for p in path: 58 path = Path(p) 59 if path.is_dir(): 60 logs = path.rglob(e) if r else path.glob(e) 61 for filename in logs: 62 yield from loadfile(filename, encoding=encoding) 63 else: 64 yield from loadfile(p, encoding=encoding) 65 66 # 窗口函数:用来一段时间获取日志信息 67 def window(q:Queue, width, iterval, handler): 68 start = datetime.datetime.strptime('19700101 00:00:01 +0800', '%Y%m%d %H:%M:%S %z') 69 current = datetime.datetime.strptime('19700101 00:00:01 +0800', '%Y%m%d %H:%M:%S %z') 70 buf = [] 71 delta = datetime.timedelta(seconds=width-iterval) 72 while True: 73 data = q.get() # next(iterable) 74 if data: 75 buf.append(data) 76 current = data['datetime'] 77 if (current - start).total_seconds() > iterval: 78 # print(buf) # [{'ipaddr': '123.125.71.36', 'datetime': datetime.datetime(2017, 4, 6, 18, 9, 25, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'method': 'GET', 'url': '/', 'protocol': 'HTTP/1.1', 'status': 200, 'size': 8642, 'useragent': 'Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)'}] 79 print('---------') 80 ret = handler(buf) 81 print(ret) 82 start = current 83 buf = [x for x in buf if x['datetime'] > current - delta] 84 85 # status状态码的统计:并计算出现频率 86 def status_handler(iterable:list): 87 status = {} 88 for st in iterable: 89 key = st['status'] 90 status[key] = status.get(key, 0) + 1 91 total = len(iterable) 92 return { k:v/total for k, v in status.items()} 93 94 # 浏览器版本的统计 95 allbrowser = {}# 到目前为止,浏览器使用的次数,全局的 96 def browser_handler(iterable:list): 97 browser = {} 98 for wb in iterable: 99 uastr = wb['useragent'] 100 ua = parse(uastr) 101 key = ua.browser.family, ua.browser.version_string 102 browser[key] = browser.get(key, 0) + 1 103 allbrowser[key] = allbrowser.get(key, 0) + 1 104 return browser 105 106 # 分发器: 107 def disparcher(src): 108 queues = [] 109 threats = [] 110 def reg(width, iterval, handler): 111 q = Queue() 112 t = threading.Thread(target=window, args=(q, width, iterval, handler)) 113 queues.append(q) 114 threats.append(t) 115 def run(): 116 for t in threats: 117 t.start() 118 while True: 119 for data in src: 120 for q in queues: 121 q.put(data) 122 while True: 123 # 假设某一时刻,要取出 allbrowser 进行分析 124 import pandas 125 cmd = input('>>>') 126 if cmd == 'plot': 127 # print(allbrowser) 128 # print(sorted(allbrowser.items(), key=lambda item:item[1], reverse=True)) 129 newdict = {} 130 for (k,ver),val in allbrowser.items(): 131 newdict[k] = newdict.get(k, 0) + val 132 from pandas import Series 133 import numpy 134 import matplotlib.pyplot as plt 135 136 cmd = input(">>>>----->>>>") 137 if cmd == 'plot1': 138 s = Series(newdict) 139 s1 = s.sort_values(ascending=False) 140 print(s1) 141 s1.plot() 142 plt.show() 143 if cmd == 'plot2': 144 s = Series(newdict) 145 print(s) 146 plt.title('browser') 147 plt.plot(s, color='red') 148 plt.show() 149 return reg, run 150 151 if __name__ == '__main__': 152 src = load('.') 153 reg, run = disparcher(src) 154 reg(10, 5, browser_handler) 155 # reg(10, 5, status_handler) 156 run()
1 183.69.210.164 - - [07/Apr/2017:09:33:07 +0800] "GET /app/template/default//images/yun-sj.png HTTP/1.1" 200 362 "http://job.magedu.com/app/template/default//style/css.css" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"
10、总结:
通过这样项目可以把基础部分做一定的练习,
-
首先,获取数据类型,利用正则表达式,进行match匹配,这里使用编译的方式,因为对进来的每一行都要做处理,所以,编译一次,以后可以直接使用
-
对正则表达式 命名分组的练习,以及groupdict的使用
-
对文件的处理,这里使用了内建函数,以及pathlib模块下的Path类,对文件对象的操作是很重要的,所以这里涉及到了,对单个文件对象的使用,其次就是文件路径的操作,文件路径的操作,主要涉及到了文件类型的判断,以及对目录的递归,
-
这里实际上没有用到显示的目录递归,只使用了 通配符递归,并且设置了 是否需要递归匹配的措施,r,根据具体的情况具体分析。
-
时间滑动窗口的使用,时间窗口是一个死循环,(目前认为),因为要不断的输出信息。所以设计到了时间问题,所以用到了datetime模块。
-
使用了队列和线程,对数据分发的措施,实现同样的数据分给不同的线程,实现不同的统计即 代码中的handler,每注册一个handler,将给其一个queue,并且分配一个线程。函数的调用跟线程相关,所以互不干涉,在各自的线程中,压栈,弹出。
-
整个程序的运行时通过最后的一个调度函数实现的,首先是注册,然后,通过轮询的机制,一次启动每个线程,同时,将数据也轮询的分发给 queue对象。
-
注意:这里都是用的死循环,因为服务器日志不停的产生,所以这边要不停的处理分析。
-