在日常工作中,会遇到需要检测某个文件夹下的变动情况,再根据对应的行为做出不同的操作。
在python中可以使用第三方模块watchdog 来实现,下面是具体的介绍与使用

watchdog安装
watchdog 是一个实时监控库,其原理是通过操作系统的时间触发,需要循环等待。
官方文档:https://python-watchdog.readthedocs.io/en/stable/
pip install watchdog我们以官网提供的简单实例入手
import sysimport timeimport loggingfrom watchdog.observers import Observerfrom watchdog.events import LoggingEventHandlerif __name__ == "__main__":logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(message)s',datefmt='%Y-%m-%d %H:%M:%S')path = sys.argv[1] if len(sys.argv) > 1 else '.'# 内置的LoggingEventHandlerevent_handler = LoggingEventHandler()observer = Observer()observer.schedule(event_handler, path, recursive=True)observer.start()try:while True:time.sleep(1)finally:observer.stop()observer.join()
observer = Observer():创建一个观察者对象。
observer.schedule():声明一个定时任务。
observer.start():启动定时任务。
文件系统事件的基类类型定义
watchdog.events.FileSystemEvent(event_type,src_path,is_directory=False):
event.event_type:时间类别,moved deleted created modified
event.src_path:触发该事件的文件或者目录的路径
event.is_directory:该事件是否由一个目录触发
由watchdog.events.FileSystemEvent基类派生的子类
watchdog.events.FileDeletedEvent() 文件被删除时触发该事件
watchdog.events.DirDeletedEvent() 目录被删除时触发该事件
watchdog.events.DirCreatedEvent() 目录被建立时触发该事件
watchdog.events.FileCreatedEvent() 文件被建立时触发该事件
watchdog.events.FileModifiedEvent() 文件被修改时触发该事件
watchdog.events.DirModifiedEvent() 目录被修改触发该事件
watchdog.events.FileMovedEvent() 文件被移动触发该事件
watchdog.events.DirMovedEvent() 目录被移动触发该事件
observer.schedule(event_handler, path, recursive=False) 的详细说明
每一次调用schedule() 对一个路径( path )进行监控处理叫做 watch , schedule() 方法会返回这个 watch ,我们可以对 watch 增加多个 event 事件处理器。
observer.add_handler_for_watch(event_handler, watch):添加1个新的事件处理器到 ;observer.remove_handler_for_watch(event_handler, watch):从watch移除1个事件处理器;observer.unschedule(watch):移除1个watch及其所有事件处理器;observer.unschedule_all():移除所有watch及关联的事件处理器;observer.on_thread_stop():等同于observer.stop()
自定义监听器,然后根据触发的事件进行操作
import datetimeimport timefrom watchdog.observers import Observerfrom watchdog.events import FileSystemEventHandlerclass MyEventHandler(FileSystemEventHandler):def __init__(self):FileSystemEventHandler.__init__(self)def on_any_event(self, event):print("-----")print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))# 移动def on_moved(self, event):if event.is_directory:print("目录 moved:{src_path} -> {dest_path}".format(src_path=event.src_path, dest_path=event.dest_path))else:print("文件 moved:{src_path} -> {dest_path}".format(src_path=event.src_path, dest_path=event.dest_path))# 新建def on_created(self, event):if event.is_directory:print("目录 created:{file_path}".format(file_path=event.src_path))else:print("文件 created:{file_path}".format(file_path=event.src_path))# 删除def on_deleted(self, event):if event.is_directory:print("目录 deleted:{file_path}".format(file_path=event.src_path))else:print("文件 deleted:{file_path}".format(file_path=event.src_path))# 修改def on_modified(self, event):if event.is_directory:print("目录 modified:{file_path}".format(file_path=event.src_path))else:print("文件 modified:{file_path}".format(file_path=event.src_path))if __name__ == '__main__':path = "/otp/five-minute-sre"myEventHandler = MyEventHandler()# 观察者observer = Observer()# recursive:True 递归的检测文件夹下所有文件变化。observer.schedule(myEventHandler, path, recursive=True)# 观察线程,非阻塞式的。observer.start()try:while True:time.sleep(1)except KeyboardInterrupt:observer.stop()observer.join()
在实际使用的时候会发现,新建一个文件,可以看到触发了多个事件。
就是FileCreatedEvent , FileModifiedEvent 事件,对应上面代码中 on_created 和 on_modified 的方法被调用
其原因在于 f = open("file.txt", "w") 这样文件创建动作会触发 FileCreatedEvent 事件,执行 on_created 函数,往文件写数据的 f.flush() 和 f.close() 操作,会触发 FileModifiedEvent 事件,执行 on_modified 函数,所以触发了 3次。
写一个用例的demo脚本
需求:监听指定目录下是否新有创建文件夹,然后对新创建的文件启动新的监听,最后对文件夹下的文件做处理的处理
#!/usr/bin/python"""用于实时检测文件生成,并且做对应的处理"""import time,threading,osfrom watchdog.observers import Observerfrom watchdog.events import FileSystemEventHandlerimport queueimport signalimport shutil,subprocessimport loggingimport logging.handlersimport datetime"""定义日志格式"""# 创建Logger并进行设置logger = logging.getLogger('SYNC DAS FILE')logger.setLevel(logging.DEBUG)#一天轮转一次日志,保留7天日志rf_handler = logging.handlers.TimedRotatingFileHandler('./log/watch_all.log', when='D', interval=1, backupCount=7, atTime=datetime.time(0, 0, 0, 0))rf_handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))f_handler = logging.FileHandler('./log/watch_error.log')f_handler.setLevel(logging.ERROR)f_handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(filename)s[:%(lineno)d] - %(message)s"))logger.addHandler(rf_handler)logger.addHandler(f_handler)"""# 通过logger对象来记录日志信息logger.debug('debug message')logger.info('info message')logger.warning('warn message')logger.error('error message')logger.critical('critical message')"""#检测文件watch_file = "sre.test"#实时检测目录monitor_dir = "/otp/five-minute-sre"#定义子进程运行时间长度,单位秒stop_time=600def sync_file(source_file):"""检测是否为目标文件,是的话进行对应操作"""logger.info("开始文件处理前,检测步骤")create_file = source_fileprint("create a file : {}".format(create_file))logger.info("create a file : {}".format(create_file))if create_file == watch_file:print("休眠30秒,等待文件完成生成")logger.info("休眠30秒,等待文件完成生成")time.sleep(30)print("========发现文件,开始处理")logger.info("========发现文件,开始处理")else:print("非目标文件,跳过...")logger.info("非目标文件,跳过...")class MyEventHandler(FileSystemEventHandler):# 文件移动def on_moved(self, event):print("文件移动触发")print(event)def on_created(self, event):print("文件创建触发")logger.info("文件创建触发")print(event)logger.info(event)if event.is_directory:print("directory created:{0}".format(event.src_path))logger.info("directory created:{0}".format(event.src_path))# run_modify(event.src_path)print(monitor_dir,event.src_path)if monitor_dir == event.src_path:return 0pid=os.fork()fork_precess.append(pid)print("当前运行中子进程1 : {}".format(fork_precess))logger.info("当前运行中子进程1 : {}".format(fork_precess))if pid==0:print("执行子进程,子进程pid={pid},父进程ppid={ppid}".format(pid=os.getpid(),ppid=os.getppid()))logger.info("执行子进程,子进程pid={pid},父进程ppid={ppid}".format(pid=os.getpid(),ppid=os.getppid()))run_modify(event.src_path)else:print("file created:{0}".format(event.src_path))logger.info("file created:{0}".format(event.src_path))logger.info("调用文件同步函数....")sync_file(event.src_path)def on_deleted(self, event):print("文件删除触发")print(event)def on_modified(self, event):print("文件编辑触发")print(event)def run_modify(monitor_dir):observer = Observer() # 创建观察者对象file_handler = MyEventHandler() # 创建事件处理对象observer.schedule(file_handler, monitor_dir, False) # 向观察者对象绑定事件和目录observer.start() # 启动try:while True:time.sleep(1)except KeyboardInterrupt:observer.stop()observer.join()def stop_fork_process():"""清理超时子进程,自定义超时时间:stop_time"""print("running.....")logger.info("清理进程启动.....")while True:time.sleep(stop_time)print("当前运行中子进程2 : {}".format(fork_precess))logger.info("当前运行中子进程2 : {}".format(fork_precess))if len(fork_precess) == 0:print("没有子进程运行中.....")logger.info("没有子进程运行中.....")else:drop_pre = fork_precess.pop()print("删除子进程:{},正在运行子进程: {}".format(drop_pre,fork_precess))logger.info("删除子进程:{},正在运行子进程: {}".format(drop_pre,fork_precess))try:os.kill(drop_pre, signal.SIGKILL)except Exception as e:fork_precess_exce.append(drop_pre)logger.info("杀死子进程异常,子进程pid :{},杀不死的子进程队列: {}".format(drop_pre,fork_precess_exce))print("当前子进程运行队列深度: {}".format(len(fork_precess)))logger.info("当前子进程运行队列深度: {}".format(len(fork_precess)))if __name__ == '__main__':#定义子进程队列global fork_precessglobal fork_precess_excefork_precess = []fork_precess_exce = []# 多线程t = threading.Thread(name="monitor_main", target=run_modify,args=(monitor_dir,))t.start()stop_fork_process()
threading.Thread:使用多线程方式启动watchdog监控进程,不阻塞后面的异步清理进程
pid=os.fork():创建子进程,用来监听新文件夹
stop_fork_process:清理超时子进程