zoukankan      html  css  js  c++  java
  • python监控目录和文件变化

    转载:https://www.cnblogs.com/lcamry/p/8392376.html

    方式1:

    一、os.listdir

    二、FindFirstChangeNotification

    三、ReadDirectoryChanges 

    四、watchdog

    五、linux下pyinotify

    一、os.listdir

    import os, time
    path_to_watch = "."
    before = dict ([(f, None) for f in os.listdir (path_to_watch)])
    while 1:
      time.sleep (10)
      after = dict ([(f, None) for f in os.listdir (path_to_watch)])
      added = [f for f in after if not f in before]
      removed = [f for f in before if not f in after]
      if added: print "Added: ", ", ".join (added)
      if removed: print "Removed: ", ", ".join (removed)
      before = after

    二、FindFirstChangeNotification

    import os
    
    import win32file
    import win32event
    import win32con
    
    path_to_watch = os.path.abspath (".")
    
    #
    # FindFirstChangeNotification sets up a handle for watching
    #  file changes. The first parameter is the path to be
    #  watched; the second is a boolean indicating whether the
    #  directories underneath the one specified are to be watched;
    #  the third is a list of flags as to what kind of changes to
    #  watch for. We're just looking at file additions / deletions.
    #
    change_handle = win32file.FindFirstChangeNotification (
      path_to_watch,
      0,
      win32con.FILE_NOTIFY_CHANGE_FILE_NAME
    )
    
    #
    # Loop forever, listing any file changes. The WaitFor... will
    #  time out every half a second allowing for keyboard interrupts
    #  to terminate the loop.
    #
    try:
    
      old_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
      while 1:
        result = win32event.WaitForSingleObject (change_handle, 500)
    
        #
        # If the WaitFor... returned because of a notification (as
        #  opposed to timing out or some error) then look for the
        #  changes in the directory contents.
        #
        if result == win32con.WAIT_OBJECT_0:
          new_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
          added = [f for f in new_path_contents if not f in old_path_contents]
          deleted = [f for f in old_path_contents if not f in new_path_contents]
          if added: print "Added: ", ", ".join (added)
          if deleted: print "Deleted: ", ", ".join (deleted)
    
          old_path_contents = new_path_contents
          win32file.FindNextChangeNotification (change_handle)
    
    finally:
      win32file.FindCloseChangeNotification (change_handle)

    三、ReadDirectoryChanges 

    #coding:utf8
    #author:lcamry
    
    import os
    
    import win32file
    import win32con
    
    ACTIONS = {
      1 : "Created",
      2 : "Deleted",
      3 : "Updated",
      4 : "Renamed from something",
      5 : "Renamed to something"
    }
    # Thanks to Claudio Grondi for the correct set of numbers
    FILE_LIST_DIRECTORY = 0x0001
    
    path_to_watch = "."
    hDir = win32file.CreateFile (
      path_to_watch,
      FILE_LIST_DIRECTORY,
      win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE | win32con.FILE_SHARE_DELETE,
      None,
      win32con.OPEN_EXISTING,
      win32con.FILE_FLAG_BACKUP_SEMANTICS,
      None
    )
    while 1:
      #
      # ReadDirectoryChangesW takes a previously-created
      # handle to a directory, a buffer size for results,
      # a flag to indicate whether to watch subtrees and
      # a filter of what changes to notify.
      #
      # NB Tim Juchcinski reports that he needed to up
      # the buffer size to be sure of picking up all
      # events when a large number of files were
      # deleted at once.
      #
      results = win32file.ReadDirectoryChangesW (
        hDir,
        1024,
        True,
        win32con.FILE_NOTIFY_CHANGE_FILE_NAME |
         win32con.FILE_NOTIFY_CHANGE_DIR_NAME |
         win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES |
         win32con.FILE_NOTIFY_CHANGE_SIZE |
         win32con.FILE_NOTIFY_CHANGE_LAST_WRITE |
         win32con.FILE_NOTIFY_CHANGE_SECURITY,
        None,
        None
      )
      for action, file in results:
        full_filename = os.path.join (path_to_watch, file)
        print full_filename, ACTIONS.get (action, "Unknown")

    四、watchdog

    #coding:utf8
    #author:lcamry
    from watchdog.observers import Observer
    from watchdog.events import *
    import time
    
    class FileEventHandler(FileSystemEventHandler):
        def __init__(self):
            FileSystemEventHandler.__init__(self)
    
        def on_moved(self, event):
            if event.is_directory:
                print("directory moved from {0} to {1}".format(event.src_path,event.dest_path))
            else:
                print("file moved from {0} to {1}".format(event.src_path,event.dest_path))
    
        def on_created(self, event):
            if event.is_directory:
                print("directory created:{0}".format(event.src_path))
            else:
                print("file created:{0}".format(event.src_path))
    
        def on_deleted(self, event):
            if event.is_directory:
                print("directory deleted:{0}".format(event.src_path))
            else:
                print("file deleted:{0}".format(event.src_path))
    
        def on_modified(self, event):
            if event.is_directory:
                print("directory modified:{0}".format(event.src_path))
            else:
                print("file modified:{0}".format(event.src_path))
    
    if __name__ == "__main__":
        observer = Observer()
        event_handler = FileEventHandler()
        observer.schedule(event_handler,"d:/dcm",True)
        observer.start()
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            observer.stop()
        observer.join()

    五、linux下pyinotify

    #coding:utf8
    #author:lcamry
    
    import os
    import pyinotify
    from functions import *
     
    WATCH_PATH = '' #监控目录
     
    if not WATCH_PATH:
      wlog('Error',"The WATCH_PATH setting MUST be set.")
      sys.exit()
    else:
      if os.path.exists(WATCH_PATH):
        wlog('Watch status','Found watch path: path=%s.' % (WATCH_PATH))
      else:
        wlog('Error','The watch path NOT exists, watching stop now: path=%s.' % (WATCH_PATH))
        sys.exit()
     
    class OnIOHandler(pyinotify.ProcessEvent):
      def process_IN_CREATE(self, event):
        wlog('Action',"create file: %s " % os.path.join(event.path,event.name))
     
      def process_IN_DELETE(self, event):
        wlog('Action',"delete file: %s " % os.path.join(event.path,event.name))
     
      def process_IN_MODIFY(self, event):
        wlog('Action',"modify file: %s " % os.path.join(event.path,event.name))
     
    def auto_compile(path = '.'):
      wm = pyinotify.WatchManager()
      mask = pyinotify.IN_CREATE | pyinotify.IN_DELETE | pyinotify.IN_MODIFY
      notifier = pyinotify.ThreadedNotifier(wm, OnIOHandler())
      notifier.start()
      wm.add_watch(path, mask,rec = True,auto_add = True)
      wlog('Start Watch','Start monitoring %s' % path)
      while True:
        try:
          notifier.process_events()
          if notifier.check_events():
            notifier.read_events()
        except KeyboardInterrupt:
          notifier.stop()
          break
     
    if __name__ == "__main__":
       auto_compile(WATCH_PATH)
    转载: https://www.cnblogs.com/lcamry/p/8392376.html
  • 相关阅读:
    python学习 05 函数switch功能
    python学习 04 函数参数
    python学习 03 函数 (只会执行一次return就不会往下执行)
    【转】Jenkins+Ant+Jmeter接口自动化集成测试实例
    【转】python做一个http接口测试框架
    python学习 02 元组
    【转】使用 Python Mock 类进行单元测试
    【转】使用Python学习selenium测试工具
    【转】利用Python中的mock库对Python代码进行模拟测试
    【转】python测试开发面试题
  • 原文地址:https://www.cnblogs.com/lshan/p/15090667.html
Copyright © 2011-2022 走看看