python 定时任务框架APScheduler与Django APScheduler

APScheduler简介

在运维项目开发中,时常需要用到定时任务,python有多种定时任务的解决方案,这里我们来一起学习其中一个解决方案,python轻型任务框架:APScheduler

APScheduler基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个python定时任务系统。

APScheduler有以下三个特点:

  • 类似于 Liunx Cron 的调度程序(可选的开始/结束时间)

  • 基于时间间隔的执行调度(周期性调度,可选的开始/结束时间)

  • 一次性执行任务(在设定的日期/时间运行一次任务)

APScheduler有四种组成部分:

  • 触发器(trigger)包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了他们自己初始配置意外,触发器完全是无状态的。

  • 作业存储(job store)存储被调度的作业,默认的作业存储是简单地把作业保存在内存中,其他的作业存储是将作业保存在数据库中。一个作业的数据讲在保存在持久化作业存储时被序列化,并在加载时被反序列化。调度器不能分享同一个作业存储。

  • 执行器(executor)处理作业的运行,他们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。

  • 调度器(scheduler)是其他的组成部分。你通常在应用只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。 通过配置executor、jobstore、trigger,使用线程池(ThreadPoolExecutor默认值20)或进程池(ProcessPoolExecutor 默认值5)并且默认最多3个(max_instances)任务实例同时运行,实现对job的增删改查等调度控制


快速上手案例

安装

pip install apscheduler

简单案例


from apscheduler.schedulers.blocking import BlockingSchedulerfrom datetime import datetime# 执行的任务函数def job():    print("现在时间是:",datetime.now().strftime("%Y-%m-%d %H:%M:%S"))    print("让我们一起进步吧!")
# 创建阻塞式调度器BlockingSchedulersched = BlockingScheduler()# 添加任务sched.add_job(job, 'interval', seconds=5, id='my_job_id') # 利用内置触发器(trigger)添加定时任务# 执行任务sched.start()

执行效果

现在时间是:2022-09-21 21:07:47让我们一起进步吧!现在时间是:2022-09-21 21:07:52让我们一起进步吧!现在时间是:2022-09-21 21:07:57让我们一起进步吧!

以上代码实现每5秒钟输出一次当前时间

对应APScheduler的概念,即一次完整的定时任务执行包含了:

(1)定义定时任务job()(2)创建调度器

(3)通过add_job添加到作业存储,添加时利用触发器明确触发条件,

(4)通过调度器执行作业存储中的定时任务(此处执行器是用默认的)

APScheduler中的重要概念

触发器(trigger)

add_job的第二个参数是trigger,它管理着作业的调度方式。它可以为date, interval或者cron。对于不同的trigger,对应的参数也相同。

1. date触发器:

在某个日期时间只触发一次事件。示例代码如下:


run_date (datetime|str) – the date/time to run the job at  -(任务开始的时间)timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already


# The job will be executed on November 6th, 2009sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])# The job will be executed on November 6th, 2009 at 16:30:05sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])

2,cron定时调度(某一定时时刻执行)

在某个确切的时间周期性的触发事件。可以使用的参数如下:


(int|str) 表示参数既可以是int类型,也可以是str类型(datetime | str) 表示参数既可以是datetime类型,也可以是str类型
year (int|str) – 4-digit year -(表示四位数的年份,如2008年)month (int|str) – month (1-12) -(表示取值范围为1-12月)day (int|str) – day of the (1-31) -(表示取值范围为1-31日)week (int|str) – ISO week (1-53) -(格里历2006年12月31日可以写成2006年-W52-7(扩展形式)或2006W527(紧凑形式))day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) - (表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示)hour (int|str) – hour (0-23) - (表示取值范围为0-23时)minute (int|str) – minute (0-59) - (表示取值范围为0-59分)second (int|str) – second (0-59) - (表示取值范围为0-59秒)start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) - (表示开始时间)end_date (datetime|str) – latest possible date/time to trigger on (inclusive) - (表示结束时间)timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示时区取值)

CronTrigger可用的表达式:

实例:


#表示2017年3月22日17时19分07秒执行该程序sched.add_job(my_job, 'cron', year=2017,month = 03,day = 22,hour = 17,minute = 19,second = 07)
#表示任务在6,7,8,11,12月份的第三个星期五的00:00,01:00,02:00,03:00 执行该程序sched.add_job(my_job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
#表示从星期一到星期五5:30(AM)直到2014-05-30 00:00:00sched.add_job(my_job(), 'cron', day_of_week='mon-fri', hour=5, minute=30,end_date='2014-05-30')# 添加任务作业,args()中最后一个参数后面要有一个逗号,本任务设置在每天凌晨1:00:00执行scheduler.add_job(task, 'cron', hour='1', minute='0', second='0', args=("hello",))

#表示每5秒执行该程序一次,相当于interval 间隔调度中seconds = 5sched.add_job(my_job, 'cron',second = '*/5')

interval触发器:

想要在固定的时间间隔触发事件。interval的触发器可以设置以下的触发参数:

  1. weeks:周。整形。

  2. days:一个月中的第几天。整形。

  3. hours:小时。整形。

  4. minutes:分钟。整形。

  5. seconds:秒。整形。

  6. start_date:间隔触发的起始时间。

  7. end_date:间隔触发的结束时间。

  8. jitter:触发的时间误差。

实例


#表示每隔3天17时19分07秒执行一次任务sched.add_job(my_job, 'interval',days  = 03,hours = 17,minutes = 19,seconds = 07)

执行器(executor)

执行器的选择取决于应用场景。通常默认的 ThreadPoolExecutor已经在大部分情况下是可以满足我们需求的。如果我们的任务涉及到一些 CPU密集计算的操作。那么应该考虑 ProcessPoolExecutor。然后针对每种程序, apscheduler也设置了不同的 executor:

  1. ThreadPoolExecutor:线程池执行器。

  2. ProcessPoolExecutor:进程池执行器。

  3. GeventExecutor:Gevent程序执行器。

  4. TornadoExecutor:Tornado程序执行器。

  5. TwistedExecutor:Twisted程序执行器。

  6. AsyncIOExecutor:asyncio程序执行器。


调度器(scheduler)

  1. BlockingScheduler:适用于调度程序是进程中唯一运行的进程,调用 start函数会阻塞当前线程,不能立即返回。

  2. BackgroundScheduler:适用于调度程序在应用程序的后台运行,调用 start后主线程不会阻塞。

  3. AsyncIOScheduler:适用于使用了 asyncio模块的应用程序。

  4. GeventScheduler:适用于使用 gevent模块的应用程序。

  5. TwistedScheduler:适用于构建 Twisted的应用程序。

  6. QtScheduler:适用于构建 Qt的应用程序。

作业存储(job store)

Scheduler是APScheduler的核心,所有相关组件通过其定义。scheduler启动之后,将开始按照配置的任务进行调度。除了依据所有定义Job的trigger生成的将要调度时间唤醒调度之外。当发生Job信息变更时也会触发调度。

比较常用的为BlockingScheduler和BackgroundScheduler

APScheduler支持的任务存储器有:

  • jobstores.memory:内存

  • jobstores.mongodb:存储在mongodb

  • jobstores.redis:存储在redis

  • jobstores.rethinkdb:存储在rethinkdb

  • jobstores.sqlalchemy:支持sqlalchemy的数据库如mysql,sqlite等

  • jobstores.zookeeper:zookeeper

不同的任务存储器可以在调度器的配置中进行配置(见调度器)

Scheduler的工作流程

Scheduler添加job流程:

Scheduler调度流程:

任务操作实例

添加作业

第一个实例是通过add_job()来添加作业,另外还有一种方式是通过scheduled_job()修饰器来修饰函数


import timefrom apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
@sched.scheduled_job('interval', seconds=5)def my_job(): print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
sched.start()

移除作业


job = scheduler.add_job(myfunc, 'interval', minutes=2)job.remove()#如果有多个任务序列的话可以给每个任务设置ID号,可以根据ID号选择清除对象,且remove放到start前才有效sched.add_job(myfunc, 'interval', minutes=2, id='my_job_id')sched.remove_job('my_job_id')
暂停和恢复作业

#暂停作业apsched.job.Job.pause()apsched.schedulers.base.BaseScheduler.pause_job()#恢复作业apsched.job.Job.resume()apsched.schedulers.base.BaseScheduler.resume_job()

获得job列表

获得调度作业的列表,可以使用get_jobs()来完成,它会返回所有的job实例。或者使用print_jobs()来输出所有格式化的作业列表。也可以利用get_job(任务ID)获取指定任务的作业列表


job = sched.add_job(my_job, 'interval', seconds=2 ,id='123')print sched.get_job(job_id='123')print sched.get_jobs()

关闭调度器

默认情况下调度器会等待所有正在运行的作业完成后,关闭所有的调度器和作业存储。如果你不想等待,可以将wait选项设置为False。


sched.shutdown()sched.shutdown(wait=False)

修改某个任务属性信息

使用 scheduler.modify_job(job_id,jobstore=None,**changes)。

Django框架中使用定时任务APScheduler

Django框架中使用定时任务APScheduler:django-apschedule。可以在django项目快速搭建定时任务框架与使用

一、安装模块


pip install django-apscheduler

二、配置

1.修改settings.py文件

INSTALLED_APPS中加入django-apscheduler应用:


INSTALLED_APPS = [    ......    'django_apscheduler',#定时执行任务]

2.执行迁移命令

在django项目的manage同级终端下:


python manage.py migrate

会自动生成两张表:django_apschedule_djangojob(存储任务)和django_apschedule_djangojobexecution(任务执行情况)

二、创建任务

两种创建任务的方法:装饰器和add_job函数。添加完成后,数据库中就可以显示

1,装饰器创建任务—适合于写代码的人自己创建任务

在任意view.py中实现代码


from apscheduler.schedulers.background import BackgroundSchedulerfrom django_apscheduler.jobstores import DjangoJobStore, register_events, register_job
#开启定时工作try: # 实例化调度器 scheduler = BackgroundScheduler() # 调度器使用DjangoJobStore() scheduler.add_jobstore(DjangoJobStore(), "default") # 设置定时任务,选择方式为interval,时间间隔为10s # 另一种方式为每天固定时间执行任务,对应代码为: # @register_job(scheduler, 'cron', day_of_week='mon-fri', hour='9', minute='30', second='10',id='task_time') @register_job(scheduler,"interval", seconds=10) def my_job(): # 这里写你要执行的任务 print("现在时间是:",datetime.now().strftime("%Y-%m-%d %H:%M:%S")) print("让我们一起进步吧!") register_events(scheduler) scheduler.start()except Exception as e: print(e) # 有错误就停止定时器 scheduler.shutdown()

2,add_job函数创建任务—可以作为定时调度平台或者监控系统用户自定义任务调度时间

如果想让用户通过页面输入参数,并提交来手动创建定时任务,就需要使用add_job函数。


import jsonfrom django.http import JsonResponsefrom apscheduler.schedulers.background import BackgroundSchedulerfrom django_apscheduler.jobstores import DjangoJobStore, register_events, register_job

scheduler = BackgroundScheduler()scheduler.add_jobstore(DjangoJobStore(), 'default')
# crondef insert_cron_job(json): # 传入 状态策略 开始任务 结束任务 # 接收参数 try: # 获取json中对应数据 # add_job添加任务,传入:my_job为要执行的定时任务,trigger指定任务触发器(triggers)—不同定时模式,args传参,id定时任务命名–id,等其他配置,根据trigger不同,传入不同的 时间格式进行触发 scheduler.add_job(my_job, trigger="", args=["s", ], id=start_job_id) except Exception as e: print("获取数据异常") return
# 具体要执行的代码def my_job(s): pass
register_events(scheduler)scheduler.start()

三,删除任务

删除完成后,可以查看数据库中对应的任务id已经删除完成

DjangoJobStore.remove_job(DjangoJobStore(), job_id=需要删除的job_id)


import jsonfrom django.http import JsonResponsefrom apscheduler.schedulers.background import BackgroundSchedulerfrom django_apscheduler.jobstores import DjangoJobStore, register_events, register_job

scheduler = BackgroundScheduler()scheduler.add_jobstore(DjangoJobStore(), 'default')
# crondef insert_cron_job(json): # 传入 状态策略 开始任务 结束任务 # 接收参数 try: # 获取json中对应数据 # add_job添加任务,传入:my_job为要执行的定时任务,trigger指定任务触发器(triggers)—不同定时模式,args传参,id定时任务命名–id,等其他配置,根据trigger不同,传入不同的 时间格式进行触发 scheduler.add_job(my_job, trigger="", args=["s", ], id=start_job_id) except Exception as e: print("获取数据异常") return
def delete_cron_job(json): # 接收参数 # 获取需要删除的job_id try: DjangoJobStore.remove_job(DjangoJobStore(), job_id=需要删除的job_id) except Exception as e: print("删除定时任务异常:", e)# 具体要执行的代码def my_job(s): pass
register_events(scheduler)scheduler.start()

请使用浏览器的分享功能分享到微信等