zoukankan      html  css  js  c++  java
  • 使用pyinotify实现加强版的linux tail -f 命令,并且对日志类型的文本进行单独优化着色显示。

    tail -f命令不能自动切换切片文件,例如日志是每100M生成一个新文件,tail -f不能自动的切换文件,必须关闭然后重新运行tail -f

    此篇使用pyinotify,检测文件更新,并实现tail -f以外,还能自动识别切换切片文件。而且针对日志类型的文件做了单独样式优化。

    运行 ./tailf.py + 文件路径

    此文件够自动从普通文本中,对日志就行着色处理,如果不是日志类型的文件,将直接输出,不进行着色处理。

    tailf.py文件的实现代码如下:

     
    import os
    import sys
    import re
    import pyinotify
    
    DIRMASK = pyinotify.IN_MODIFY 
              | pyinotify.IN_ATTRIB 
              | pyinotify.IN_MOVE_SELF 
              | pyinotify.IN_CREATE
    
    
    class Handler(pyinotify.ProcessEvent):
    
        def __init__(self, filename):
            self._fh = None
            self._path = filename
    
            super(Handler, self).__init__()
    
        def my_init(self):
            try:
                self._fh = open(self._path, 'r')
            except IOError as e:
                sys.stderr.write('open file failed, %s' % e)
            else:
                self._fh.seek(0, 2)
    
        def process_IN_CREATE(self, event):
            path = self._path
    
            if path in os.path.join(event.path, event.name):
                if hasattr(self, 'fh'):
                    self._fh.close()
    
                try:
                    self._fh = open(self._path, 'r')
                except IOError as e:
                    sys.stderr.write('open file failed, %s' % e)
                else:
                    self._fh.seek(0, 2)
    
                    for r in self._fh.readlines():
                        # sys.stdout.write(r)
                        process_line(r)
    
        def process_IN_MODIFY(self, event):
            path = self._path
    
            if path not in os.path.join(event.path, event.name):
                return
    
            if not self._fh.closed:
                for r in self._fh.readlines():
                    # sys.stdout.write(r)
                    process_line(r)
    
        def process_IN_MOVE_SELF(self, event):
            path = self._path
    
            if path in os.path.join(event.path, event.name):
                sys.stderr.write('monitor file move')
    
        def process_IN_ATTRIB(self, event):
            pass
    
    
    class Tailer(object):
    
        def __init__(self, filename):
            super(Tailer, self).__init__()
            self._path = filename
            self._notifier = None
    
            self._init()
    
        def __del__(self):
            self._notifier.stop()
    
        def _init(self):
            path = self._path
    
            index = path.rfind('/')
            wm = pyinotify.WatchManager()
            wm.add_watch(path[:index], DIRMASK)
    
            handler = Handler(path)
            self._notifier = pyinotify.Notifier(wm, handler)
    
        def run(self):
            self.read_last_10240_word()
            while True:
                self._notifier.process_events()
    
                if self._notifier.check_events():
                    self._notifier.read_events()
    
    
        def read_last_10240_word(self):
            with open(self._path,'rb') as f:
                f.seek(-10240,2)
                for l in f.readlines():
                    # print(l.decode())
                    process_line(l.decode())
    
    
    def process_line(line_str):
        pass
        #matcher = re.search(r'd{4}-d{2}-d{2}.*?- (DEBUG|INFO|WARNING|ERROR|CRITICAL) -[sS]*?(File ".*?.py", line d*)', line_str)
        matcher = re.search(r'^d{4}-d{2}-d{2}.*?(DEBUG|INFO|WARNING|ERROR|CRITICAL)', line_str)
        if not matcher:
            print(line_str)
        else:
            log_level_str = matcher.group(1)
            if log_level_str == 'DEBUG':
                print('33[0;32m%s33[0m' % line_str)
            elif log_level_str == 'INFO':
                print('33[0;96m%s33[0m' % line_str)
            elif log_level_str == 'WARNING':
                print('33[0;33m%s33[0m' % line_str)
            elif log_level_str == 'ERROR':
                print('33[0;35m%s33[0m' % line_str)
            elif log_level_str == 'CRITICAL':
                print('33[0;31m%s33[0m' % line_str)
    
    
    
    if __name__ == '__main__':
        if len(sys.argv[1:]) != 1:
            print('Usage: python tail.py </path/to/filename>')
            sys.exit(0)
    
        path = sys.argv[1]
        Tailer(path).run()
  • 相关阅读:
    这次安装不太一样
    解惑C#不用释放内存(4)C#为何不用释放内存
    解惑C#不用释放内存(3)C++如何分配内存
    解惑C#不用释放内存(2)分配内存
    解惑C#不用释放内存(1)章节重点
    学习Java异常理解运行期异常
    学习Java声明异常throws
    学习java异常理解编译期异常
    学习java异常-前不久出现的问题
    学习java异常
  • 原文地址:https://www.cnblogs.com/ydf0509/p/9387686.html
Copyright © 2011-2022 走看看