zoukankan      html  css  js  c++  java
  • Python监控目录和文件变化

    一、os.listdir

    import os, time
    path_to_watch = "."
    before = dict ([(f, None) for f in os.listdir (path_to_watch)])
    while 1:
      time.sleep (10)
      after = dict ([(f, None) for f in os.listdir (path_to_watch)])
      added = [f for f in after if not f in before]
      removed = [f for f in before if not f in after]
      if added: print "Added: ", ", ".join (added)
      if removed: print "Removed: ", ", ".join (removed)
      before = after

    二、FindFirstChangeNotification

    import os
    
    import win32file
    import win32event
    import win32con
    
    path_to_watch = os.path.abspath (".")
    
    #
    # FindFirstChangeNotification sets up a handle for watching
    #  file changes. The first parameter is the path to be
    #  watched; the second is a boolean indicating whether the
    #  directories underneath the one specified are to be watched;
    #  the third is a list of flags as to what kind of changes to
    #  watch for. We're just looking at file additions / deletions.
    #
    change_handle = win32file.FindFirstChangeNotification (
      path_to_watch,
      0,
      win32con.FILE_NOTIFY_CHANGE_FILE_NAME
    )
    
    #
    # Loop forever, listing any file changes. The WaitFor... will
    #  time out every half a second allowing for keyboard interrupts
    #  to terminate the loop.
    #
    try:
    
      old_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
      while 1:
        result = win32event.WaitForSingleObject (change_handle, 500)
    
        #
        # If the WaitFor... returned because of a notification (as
        #  opposed to timing out or some error) then look for the
        #  changes in the directory contents.
        #
        if result == win32con.WAIT_OBJECT_0:
          new_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
          added = [f for f in new_path_contents if not f in old_path_contents]
          deleted = [f for f in old_path_contents if not f in new_path_contents]
          if added: print "Added: ", ", ".join (added)
          if deleted: print "Deleted: ", ", ".join (deleted)
    
          old_path_contents = new_path_contents
          win32file.FindNextChangeNotification (change_handle)
    
    finally:
      win32file.FindCloseChangeNotification (change_handle)

    三、ReadDirectoryChanges 

    
    
    #coding:utf8
    #author:lcamry

    import
    os import win32file import win32con ACTIONS = { 1 : "Created", 2 : "Deleted", 3 : "Updated", 4 : "Renamed from something", 5 : "Renamed to something" } # Thanks to Claudio Grondi for the correct set of numbers FILE_LIST_DIRECTORY = 0x0001 path_to_watch = "." hDir = win32file.CreateFile ( path_to_watch, FILE_LIST_DIRECTORY, win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE | win32con.FILE_SHARE_DELETE, None, win32con.OPEN_EXISTING, win32con.FILE_FLAG_BACKUP_SEMANTICS, None ) while 1: # # ReadDirectoryChangesW takes a previously-created # handle to a directory, a buffer size for results, # a flag to indicate whether to watch subtrees and # a filter of what changes to notify. # # NB Tim Juchcinski reports that he needed to up # the buffer size to be sure of picking up all # events when a large number of files were # deleted at once. # results = win32file.ReadDirectoryChangesW ( hDir, 1024, True, win32con.FILE_NOTIFY_CHANGE_FILE_NAME | win32con.FILE_NOTIFY_CHANGE_DIR_NAME | win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES | win32con.FILE_NOTIFY_CHANGE_SIZE | win32con.FILE_NOTIFY_CHANGE_LAST_WRITE | win32con.FILE_NOTIFY_CHANGE_SECURITY, None, None ) for action, file in results: full_filename = os.path.join (path_to_watch, file) print full_filename, ACTIONS.get (action, "Unknown")

    四、watchdog

    
    
    #coding:utf8
    #author:lcamry
    from watchdog.observers import Observer
    from watchdog.events import *
    import time
    
    class FileEventHandler(FileSystemEventHandler):
        def __init__(self):
            FileSystemEventHandler.__init__(self)
    
        def on_moved(self, event):
            if event.is_directory:
                print("directory moved from {0} to {1}".format(event.src_path,event.dest_path))
            else:
                print("file moved from {0} to {1}".format(event.src_path,event.dest_path))
    
        def on_created(self, event):
            if event.is_directory:
                print("directory created:{0}".format(event.src_path))
            else:
                print("file created:{0}".format(event.src_path))
    
        def on_deleted(self, event):
            if event.is_directory:
                print("directory deleted:{0}".format(event.src_path))
            else:
                print("file deleted:{0}".format(event.src_path))
    
        def on_modified(self, event):
            if event.is_directory:
                print("directory modified:{0}".format(event.src_path))
            else:
                print("file modified:{0}".format(event.src_path))
    
    if __name__ == "__main__":
        observer = Observer()
        event_handler = FileEventHandler()
        observer.schedule(event_handler,"d:/dcm",True)
        observer.start()
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            observer.stop()
        observer.join()

    五、linux下pyinotify

    #coding:utf8
    #author:lcamry
    import os import pyinotify from functions import * WATCH_PATH = '' #监控目录 if not WATCH_PATH: wlog('Error',"The WATCH_PATH setting MUST be set.") sys.exit() else: if os.path.exists(WATCH_PATH): wlog('Watch status','Found watch path: path=%s.' % (WATCH_PATH)) else: wlog('Error','The watch path NOT exists, watching stop now: path=%s.' % (WATCH_PATH)) sys.exit() class OnIOHandler(pyinotify.ProcessEvent): def process_IN_CREATE(self, event): wlog('Action',"create file: %s " % os.path.join(event.path,event.name)) def process_IN_DELETE(self, event): wlog('Action',"delete file: %s " % os.path.join(event.path,event.name)) def process_IN_MODIFY(self, event): wlog('Action',"modify file: %s " % os.path.join(event.path,event.name)) def auto_compile(path = '.'): wm = pyinotify.WatchManager() mask = pyinotify.IN_CREATE | pyinotify.IN_DELETE | pyinotify.IN_MODIFY notifier = pyinotify.ThreadedNotifier(wm, OnIOHandler()) notifier.start() wm.add_watch(path, mask,rec = True,auto_add = True) wlog('Start Watch','Start monitoring %s' % path) while True: try: notifier.process_events() if notifier.check_events(): notifier.read_events() except KeyboardInterrupt: notifier.stop() break if __name__ == "__main__": auto_compile(WATCH_PATH)
  • 相关阅读:
    个人冲刺二(7)
    个人冲刺二(6)
    个人冲刺二(5)
    个人冲刺二(4)
    对称二叉树 · symmetric binary tree
    108 Convert Sorted Array to Binary Search Tree数组变成高度平衡的二叉树
    530.Minimum Absolute Difference in BST 二叉搜索树中的最小差的绝对值
    pp 集成工程师 mism师兄问一问
    17. Merge Two Binary Trees 融合二叉树
    270. Closest Binary Search Tree Value 二叉搜索树中,距离目标值最近的节点
  • 原文地址:https://www.cnblogs.com/lcamry/p/8392376.html
Copyright © 2011-2022 走看看