监控文件系统事件的Python库-watchdog

在日常工作中,会遇到需要检测某个文件夹下的变动情况,再根据对应的行为做出不同的操作。

在python中可以使用第三方模块watchdog 来实现,下面是具体的介绍与使用

watchdog安装

watchdog 是一个实时监控库,其原理是通过操作系统的时间触发,需要循环等待。

官方文档:https://python-watchdog.readthedocs.io/en/stable/

pip install watchdog

我们以官网提供的简单实例入手

import sysimport timeimport loggingfrom watchdog.observers import Observerfrom watchdog.events import LoggingEventHandler
if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') path = sys.argv[1] if len(sys.argv) > 1 else '.' # 内置的LoggingEventHandler event_handler = LoggingEventHandler() observer = Observer() observer.schedule(event_handler, path, recursive=True) observer.start() try: while True: time.sleep(1) finally: observer.stop() observer.join()

observer = Observer():创建一个观察者对象。

observer.schedule():声明一个定时任务。

observer.start():启动定时任务。

文件系统事件的基类类型定义

watchdog.events.FileSystemEvent(event_type,src_path,is_directory=False):

  • event.event_type:时间类别,moved deleted created modified

  • event.src_path:触发该事件的文件或者目录的路径

  • event.is_directory:该事件是否由一个目录触发

由watchdog.events.FileSystemEvent基类派生的子类

  • watchdog.events.FileDeletedEvent() 文件被删除时触发该事件

  • watchdog.events.DirDeletedEvent() 目录被删除时触发该事件

  • watchdog.events.DirCreatedEvent() 目录被建立时触发该事件

  • watchdog.events.FileCreatedEvent() 文件被建立时触发该事件

  • watchdog.events.FileModifiedEvent() 文件被修改时触发该事件

  • watchdog.events.DirModifiedEvent() 目录被修改触发该事件

  • watchdog.events.FileMovedEvent() 文件被移动触发该事件

  • watchdog.events.DirMovedEvent() 目录被移动触发该事件

observer.schedule(event_handler, path, recursive=False) 的详细说明

    每一次调用schedule() 对一个路径( path )进行监控处理叫做 watchschedule() 方法会返回这个 watch ,我们可以对 watch 增加多个 event 事件处理器。

  • observer.add_handler_for_watch(event_handler, watch):添加1个新的事件处理器到 ;

  • observer.remove_handler_for_watch(event_handler, watch):从 watch 移除1个事件处理器;

  • observer.unschedule(watch):移除1个 watch 及其所有事件处理器;

  • observer.unschedule_all():移除所有 watch 及关联的事件处理器;

  • observer.on_thread_stop():等同于 observer.stop()

自定义监听器,然后根据触发的事件进行操作

import datetimeimport timefrom watchdog.observers import Observerfrom watchdog.events import FileSystemEventHandler  class MyEventHandler(FileSystemEventHandler):    def __init__(self):        FileSystemEventHandler.__init__(self)     def on_any_event(self, event):        print("-----")        print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))     # 移动    def on_moved(self, event):        if event.is_directory:            print("目录 moved:{src_path} -> {dest_path}".format(src_path=event.src_path, dest_path=event.dest_path))        else:            print("文件 moved:{src_path} -> {dest_path}".format(src_path=event.src_path, dest_path=event.dest_path))     # 新建    def on_created(self, event):        if event.is_directory:            print("目录 created:{file_path}".format(file_path=event.src_path))        else:            print("文件 created:{file_path}".format(file_path=event.src_path))     # 删除    def on_deleted(self, event):        if event.is_directory:            print("目录 deleted:{file_path}".format(file_path=event.src_path))        else:            print("文件 deleted:{file_path}".format(file_path=event.src_path))     # 修改    def on_modified(self, event):         if event.is_directory:            print("目录 modified:{file_path}".format(file_path=event.src_path))        else:            print("文件 modified:{file_path}".format(file_path=event.src_path)) 
if __name__ == '__main__': path = "/otp/five-minute-sre" myEventHandler = MyEventHandler() # 观察者 observer = Observer() # recursive:True 递归的检测文件夹下所有文件变化。 observer.schedule(myEventHandler, path, recursive=True) # 观察线程,非阻塞式的。 observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join()

   

    在实际使用的时候会发现,新建一个文件,可以看到触发了多个事件。

   就是FileCreatedEventFileModifiedEvent 事件,对应上面代码中 on_createdon_modified 的方法被调用

    其原因在于 f = open("file.txt", "w") 这样文件创建动作会触发 FileCreatedEvent 事件,执行 on_created 函数,往文件写数据的 f.flush()f.close() 操作,会触发 FileModifiedEvent 事件,执行 on_modified 函数,所以触发了 3次。


写一个用例的demo脚本

需求:监听指定目录下是否新有创建文件夹,然后对新创建的文件启动新的监听,最后对文件夹下的文件做处理的处理

#!/usr/bin/python"""用于实时检测文件生成,并且做对应的处理"""import time,threading,osfrom watchdog.observers import Observerfrom watchdog.events import FileSystemEventHandlerimport queueimport signalimport shutil,subprocessimport loggingimport logging.handlersimport datetime"""定义日志格式"""# 创建Logger并进行设置logger = logging.getLogger('SYNC DAS FILE')logger.setLevel(logging.DEBUG)#一天轮转一次日志,保留7天日志rf_handler = logging.handlers.TimedRotatingFileHandler('./log/watch_all.log', when='D', interval=1, backupCount=7, atTime=datetime.time(0, 0, 0, 0))rf_handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
f_handler = logging.FileHandler('./log/watch_error.log')f_handler.setLevel(logging.ERROR)f_handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(filename)s[:%(lineno)d] - %(message)s"))
logger.addHandler(rf_handler)logger.addHandler(f_handler)
"""# 通过logger对象来记录日志信息logger.debug('debug message')logger.info('info message')logger.warning('warn message')logger.error('error message')logger.critical('critical message')"""
#检测文件watch_file = "sre.test"#实时检测目录monitor_dir = "/otp/five-minute-sre"#定义子进程运行时间长度,单位秒stop_time=600


def sync_file(source_file): """ 检测是否为目标文件,是的话进行对应操作 """ logger.info("开始文件处理前,检测步骤") create_file = source_file print("create a file : {}".format(create_file)) logger.info("create a file : {}".format(create_file)) if create_file == watch_file: print("休眠30秒,等待文件完成生成") logger.info("休眠30秒,等待文件完成生成") time.sleep(30) print("========发现文件,开始处理") logger.info("========发现文件,开始处理") else: print("非目标文件,跳过...") logger.info("非目标文件,跳过...")
class MyEventHandler(FileSystemEventHandler): # 文件移动 def on_moved(self, event): print("文件移动触发") print(event)
def on_created(self, event): print("文件创建触发") logger.info("文件创建触发") print(event) logger.info(event) if event.is_directory: print("directory created:{0}".format(event.src_path)) logger.info("directory created:{0}".format(event.src_path)) # run_modify(event.src_path) print(monitor_dir,event.src_path) if monitor_dir == event.src_path: return 0 pid=os.fork() fork_precess.append(pid) print("当前运行中子进程1 : {}".format(fork_precess)) logger.info("当前运行中子进程1 : {}".format(fork_precess)) if pid==0: print("执行子进程,子进程pid={pid},父进程ppid={ppid}".format(pid=os.getpid(),ppid=os.getppid())) logger.info("执行子进程,子进程pid={pid},父进程ppid={ppid}".format(pid=os.getpid(),ppid=os.getppid())) run_modify(event.src_path)
else: print("file created:{0}".format(event.src_path)) logger.info("file created:{0}".format(event.src_path)) logger.info("调用文件同步函数....") sync_file(event.src_path)

def on_deleted(self, event): print("文件删除触发") print(event)
def on_modified(self, event): print("文件编辑触发") print(event)
def run_modify(monitor_dir): observer = Observer() # 创建观察者对象 file_handler = MyEventHandler() # 创建事件处理对象 observer.schedule(file_handler, monitor_dir, False) # 向观察者对象绑定事件和目录 observer.start() # 启动 try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join()
def stop_fork_process(): """ 清理超时子进程,自定义超时时间:stop_time """ print("running.....") logger.info("清理进程启动.....") while True: time.sleep(stop_time) print("当前运行中子进程2 : {}".format(fork_precess)) logger.info("当前运行中子进程2 : {}".format(fork_precess)) if len(fork_precess) == 0: print("没有子进程运行中.....") logger.info("没有子进程运行中.....") else: drop_pre = fork_precess.pop() print("删除子进程:{},正在运行子进程: {}".format(drop_pre,fork_precess)) logger.info("删除子进程:{},正在运行子进程: {}".format(drop_pre,fork_precess)) try: os.kill(drop_pre, signal.SIGKILL) except Exception as e: fork_precess_exce.append(drop_pre) logger.info("杀死子进程异常,子进程pid :{},杀不死的子进程队列: {}".format(drop_pre,fork_precess_exce)) print("当前子进程运行队列深度: {}".format(len(fork_precess))) logger.info("当前子进程运行队列深度: {}".format(len(fork_precess)))
if __name__ == '__main__': #定义子进程队列 global fork_precess global fork_precess_exce fork_precess = [] fork_precess_exce = [] # 多线程 t = threading.Thread(name="monitor_main", target=run_modify, args=(monitor_dir,)) t.start() stop_fork_process()

threading.Thread:使用多线程方式启动watchdog监控进程,不阻塞后面的异步清理进程

pid=os.fork():创建子进程,用来监听新文件夹

stop_fork_process:清理超时子进程


请使用浏览器的分享功能分享到微信等