1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # a daemon which collecting channel traffic 4 5 6 import subprocess 7 import multiprocessing 8 import re 9 import os 10 import sys 11 import time 12 from os import stat 13 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler 14 from SocketServer import ThreadingMixIn 15 16 __authors__ = ['majing <majing@staff.sina.com.cn>'] 17 __version__ = "1.1" 18 __date__ = "Aug 14, 2015" 19 __license__ = "GPL license" 20 21 if (hasattr(os, "devnull")): 22 NULL_DEVICE = os.devnull 23 else: 24 NULL_DEVICE = "/dev/null" 25 26 27 def _redirectFileDescriptors(): 28 """ 29 Redirect stdout and stderr. 30 """ 31 import resource # POSIX resource information 32 maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] 33 if maxfd == resource.RLIM_INFINITY: 34 maxfd = 1024 35 36 for fd in range(0, maxfd): 37 try: 38 os.ttyname(fd) 39 except: 40 continue 41 try: 42 os.close(fd) 43 except OSError: 44 pass 45 46 os.open(NULL_DEVICE, os.O_RDWR) 47 os.dup2(0, 1) 48 os.dup2(0, 2) 49 50 51 def python_daemon(): 52 """ 53 Make program run on daemon mode. 54 """ 55 if os.name != 'posix': 56 print 'Daemon is only supported on Posix-compliant systems.' 57 os._exit(1) 58 59 try: 60 if(os.fork() > 0): 61 os._exit(0) 62 except OSError: 63 print "create daemon failed." 64 os._exit(1) 65 66 os.chdir('/') 67 os.setsid() 68 os.umask(0) 69 70 try: 71 if(os.fork() > 0): 72 os._exit(0) 73 _redirectFileDescriptors() 74 except OSError: 75 print "create daemon failed." 76 os._exit(1) 77 78 79 logfile = '/data0/log/sinaedge/esnv2/access.log' 80 if not os.path.isfile(logfile): 81 os._exit(1) 82 83 # ensure data is shared between every processes. 84 manager = multiprocessing.Manager() 85 channel_traffics = {} 86 channel_traffics = manager.dict() 87 88 89 # a log generator 90 def logtailer(logfile): 91 ''' custom a generator, when logfile 92 rotated, this generator will be closed''' 93 with open(logfile) as f: 94 last_inode = stat(logfile).st_ino 95 f.seek(0, 2) # seek to eof 96 while True: 97 line = f.readline() 98 if not line: 99 if last_inode != stat(logfile).st_ino: 100 raise StopIteration('logfile rotated') 101 else: 102 time.sleep(0.05) 103 continue 104 yield line 105 106 107 def analysis_and_format_log(): 108 sourcelines = logtailer(logfile) 109 while True: 110 try: 111 line = sourcelines.next() 112 channel, transfer_bytes = line.split()[0:11:10] 113 if not transfer_bytes.isdigit(): 114 continue 115 if channel_traffics.has_key(channel): 116 channel_traffics[channel] += int(transfer_bytes) 117 else: 118 channel_traffics[channel] = int(transfer_bytes) 119 except StopIteration, e: 120 ''' 121 if log rotated, clear channel_traffics dict; 122 then close old generator, start a new lines generator. 123 ''' 124 sourcelines.close() 125 channel_traffics.clear() 126 sourcelines = logtailer(logfile) 127 except Exception, e: 128 continue #ignore other error. 129 130 131 class GetChannelBandHandler(BaseHTTPRequestHandler): 132 ''' a interface for query a channel current traffic''' 133 def do_GET(self): 134 self.send_response(200) 135 self.end_headers() 136 query_channel = self.path.split('/')[-1] 137 if query_channel in channel_traffics: 138 current_traffic = channel_traffics[query_channel] 139 self.wfile.write(current_traffic) 140 else: 141 self.wfile.write("404: channel not found") 142 return 143 144 145 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer): 146 pass 147 148 149 if __name__ == "__main__": 150 python_daemon() 151 # start analysis_and_format_log function, run in backgroud. 152 d = multiprocessing.Process(name='daemon', target=analysis_and_format_log) 153 d.daemon = True 154 d.start() 155 server = ThreadedHTTPServer(("", 8888), GetChannelBandHandler) 156 print 'Starting server on 8888, use <Ctrl-C> to stop' 157 server.serve_forever()