zoukankan      html  css  js  c++  java
  • Python监控目录和文件变化

    一、os.listdir

    import os, time
    path_to_watch = "."
    before = dict ([(f, None) for f in os.listdir (path_to_watch)])
    while 1:
      time.sleep (10)
      after = dict ([(f, None) for f in os.listdir (path_to_watch)])
      added = [f for f in after if not f in before]
      removed = [f for f in before if not f in after]
      if added: print "Added: ", ", ".join (added)
      if removed: print "Removed: ", ", ".join (removed)
      before = after

    二、FindFirstChangeNotification

    import os
    
    import win32file
    import win32event
    import win32con
    
    path_to_watch = os.path.abspath (".")
    
    #
    # FindFirstChangeNotification sets up a handle for watching
    #  file changes. The first parameter is the path to be
    #  watched; the second is a boolean indicating whether the
    #  directories underneath the one specified are to be watched;
    #  the third is a list of flags as to what kind of changes to
    #  watch for. We're just looking at file additions / deletions.
    #
    change_handle = win32file.FindFirstChangeNotification (
      path_to_watch,
      0,
      win32con.FILE_NOTIFY_CHANGE_FILE_NAME
    )
    
    #
    # Loop forever, listing any file changes. The WaitFor... will
    #  time out every half a second allowing for keyboard interrupts
    #  to terminate the loop.
    #
    try:
    
      old_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
      while 1:
        result = win32event.WaitForSingleObject (change_handle, 500)
    
        #
        # If the WaitFor... returned because of a notification (as
        #  opposed to timing out or some error) then look for the
        #  changes in the directory contents.
        #
        if result == win32con.WAIT_OBJECT_0:
          new_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
          added = [f for f in new_path_contents if not f in old_path_contents]
          deleted = [f for f in old_path_contents if not f in new_path_contents]
          if added: print "Added: ", ", ".join (added)
          if deleted: print "Deleted: ", ", ".join (deleted)
    
          old_path_contents = new_path_contents
          win32file.FindNextChangeNotification (change_handle)
    
    finally:
      win32file.FindCloseChangeNotification (change_handle)

    三、ReadDirectoryChanges 

    
    
    #coding:utf8
    #author:lcamry

    import
    os import win32file import win32con ACTIONS = { 1 : "Created", 2 : "Deleted", 3 : "Updated", 4 : "Renamed from something", 5 : "Renamed to something" } # Thanks to Claudio Grondi for the correct set of numbers FILE_LIST_DIRECTORY = 0x0001 path_to_watch = "." hDir = win32file.CreateFile ( path_to_watch, FILE_LIST_DIRECTORY, win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE | win32con.FILE_SHARE_DELETE, None, win32con.OPEN_EXISTING, win32con.FILE_FLAG_BACKUP_SEMANTICS, None ) while 1: # # ReadDirectoryChangesW takes a previously-created # handle to a directory, a buffer size for results, # a flag to indicate whether to watch subtrees and # a filter of what changes to notify. # # NB Tim Juchcinski reports that he needed to up # the buffer size to be sure of picking up all # events when a large number of files were # deleted at once. # results = win32file.ReadDirectoryChangesW ( hDir, 1024, True, win32con.FILE_NOTIFY_CHANGE_FILE_NAME | win32con.FILE_NOTIFY_CHANGE_DIR_NAME | win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES | win32con.FILE_NOTIFY_CHANGE_SIZE | win32con.FILE_NOTIFY_CHANGE_LAST_WRITE | win32con.FILE_NOTIFY_CHANGE_SECURITY, None, None ) for action, file in results: full_filename = os.path.join (path_to_watch, file) print full_filename, ACTIONS.get (action, "Unknown")

    四、watchdog

    
    
    #coding:utf8
    #author:lcamry
    from watchdog.observers import Observer
    from watchdog.events import *
    import time
    
    class FileEventHandler(FileSystemEventHandler):
        def __init__(self):
            FileSystemEventHandler.__init__(self)
    
        def on_moved(self, event):
            if event.is_directory:
                print("directory moved from {0} to {1}".format(event.src_path,event.dest_path))
            else:
                print("file moved from {0} to {1}".format(event.src_path,event.dest_path))
    
        def on_created(self, event):
            if event.is_directory:
                print("directory created:{0}".format(event.src_path))
            else:
                print("file created:{0}".format(event.src_path))
    
        def on_deleted(self, event):
            if event.is_directory:
                print("directory deleted:{0}".format(event.src_path))
            else:
                print("file deleted:{0}".format(event.src_path))
    
        def on_modified(self, event):
            if event.is_directory:
                print("directory modified:{0}".format(event.src_path))
            else:
                print("file modified:{0}".format(event.src_path))
    
    if __name__ == "__main__":
        observer = Observer()
        event_handler = FileEventHandler()
        observer.schedule(event_handler,"d:/dcm",True)
        observer.start()
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            observer.stop()
        observer.join()

    五、linux下pyinotify

    #coding:utf8
    #author:lcamry
    import os import pyinotify from functions import * WATCH_PATH = '' #监控目录 if not WATCH_PATH: wlog('Error',"The WATCH_PATH setting MUST be set.") sys.exit() else: if os.path.exists(WATCH_PATH): wlog('Watch status','Found watch path: path=%s.' % (WATCH_PATH)) else: wlog('Error','The watch path NOT exists, watching stop now: path=%s.' % (WATCH_PATH)) sys.exit() class OnIOHandler(pyinotify.ProcessEvent): def process_IN_CREATE(self, event): wlog('Action',"create file: %s " % os.path.join(event.path,event.name)) def process_IN_DELETE(self, event): wlog('Action',"delete file: %s " % os.path.join(event.path,event.name)) def process_IN_MODIFY(self, event): wlog('Action',"modify file: %s " % os.path.join(event.path,event.name)) def auto_compile(path = '.'): wm = pyinotify.WatchManager() mask = pyinotify.IN_CREATE | pyinotify.IN_DELETE | pyinotify.IN_MODIFY notifier = pyinotify.ThreadedNotifier(wm, OnIOHandler()) notifier.start() wm.add_watch(path, mask,rec = True,auto_add = True) wlog('Start Watch','Start monitoring %s' % path) while True: try: notifier.process_events() if notifier.check_events(): notifier.read_events() except KeyboardInterrupt: notifier.stop() break if __name__ == "__main__": auto_compile(WATCH_PATH)
  • 相关阅读:
    appium---模拟点击事件
    python发送邮件(smtplib)
    postman---postman提示 Could not get any response
    postman---postman导出python脚本
    postman---postman生成测试报告
    python读写Excel方法(xlwt和xlrd)
    java求两个集合的交集和并集,比较器
    集合类型操作
    Random类和Math.random()方法
    final修饰和StringBuffer的几个案例(拼接,反转,对称操作)
  • 原文地址:https://www.cnblogs.com/lcamry/p/8392376.html
Copyright © 2011-2022 走看看