APScheduler
即可以单独使用,也可以用于常见的Web
框架。
高级 Python
调度程序(APScheduler
)一个 Python
库,可让您安排稍后执行的 Python
代码,可以是一次,也可以是定期执行。您可以根据需要随时添加新作业或删除旧作业。如果将作业存储在数据库中,它们也将在调度程序重新启动后继续运行并保持其状态。重新启动调度程序后,它将运行它应该在脱机时运行的所有作业。
1. 安装工具
APScheduler 工具的安装方式原来如此简单
- 安装操作
# 直接安装即可
$ pip install apscheduler
# 下载安装即可
$ https://pypi.python.org/pypi/APScheduler/
$ python setup.py install
- 简单使用
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
def tick(text=None):
print(f"Tick! The time is: {datetime.now()} - {text}")
if __name__ == '__main__':
scheduler = BackgroundScheduler(timezone="Asia/Shanghai")
# 使用date触发器
scheduler.add_job(tick, 'date', run_date=date(2009, 11, 6), args=['hello'])
# 使用interval触发器
scheduler.add_job(tick, 'interval', hours=2, seconds=3)
# 使用crontab触发器
scheduler.add_job(tick, 'cron', hours=2, seconds=3)
try:
print('Press Ctrl+C to exit ...')
scheduler.start() # 启动任务
except (KeyboardInterrupt, SystemExit):
pass
2. 基本组件
基本组件分为:
triggers
|job stores
|executors
|schedulers
- triggers(触发器)
触发器(triggers
)提供的是 scheduling
的调度逻辑,并且每个 job
都有属于自己 triggers
,用于管理任务何时开始执行。除了在初始配置之外,triggers
完全是无状态的。
- job stores(任务存储器)
任务存储器(job stores
)主要包含的是 scheduling
的调度任务。默认情况下,job stores
会将任务存储在内存中,当然你也可以将任务存储在各种各样类型的数据库中,达到持久存储的效果。存储的时候会对数据进行序列化,再被加载的时候进行反序列化。除了默认情况下,job stores
不会将 job data
存储在内存中,而是充当中间人,用于在后端保存、加载、更新和搜索任务。job stores
绝对不会在 schedulers
之间共享数据的。
- executors(执行器)
执行器(executors
)是处理 job
任务如何运行的,通常将 job
指定的可调用对象提交给线程池或进程池来完成此操作。作业完成后,执行器会通知 scheduler
调度程序,调度程序随后会发出相应的事件。
- schedulers(任务调度器)
任务调度器(schedulers
)的作用就是将各个组件给调度起来。通常,你的应用程序中只会运行一个 schedulers
。应用程序开发人员通常不直接处理任务存储(job stores
),执行器(executors
)和触发器(triggers
)。相反,任务调度器(schedulers
)提供了适当的接口来处理所有这些。配置 job stores
和 executors
是通过 schedulers
完成的,添加、修改和删除作业也是如此。
3. 正确选择
Choosing the right scheduler, job store(s), executor(s) and trigger(s)
apscheduler.events
apscheduler.executors.base
apscheduler.executors.debug
apscheduler.executors.pool
- 线程池执行器、进程池执行器
apscheduler.executors.asyncio
- asyncio 程序执行器
apscheduler.executors.gevent
- Gevent 程序执行器
apscheduler.executors.twisted
- Twisted 程序执行器
apscheduler.job
apscheduler.jobstores.base
apscheduler.jobstores.memory
- 没有序列化,任务存储在内存中,增删改查都是在内存中完成
apscheduler.jobstores.sqlalchemy
- 使用 SQLAlchemy 这个 ORM 框架作为存储方式
apscheduler.jobstores.redis
- 使用 redis 作为存储器
apscheduler.jobstores.mongodb
- 使用 mongodb 作为存储器
apscheduler.jobstores.rethinkdb
- 使用 rethinkdb 作为存储器
apscheduler.jobstores.zookeeper
- 使用 zookeeper 作为存储器
apscheduler.schedulers
apscheduler.schedulers.base
apscheduler.schedulers.blocking
- 适用于调度程序是进程中唯一运行的进程,调用 start 函数会阻塞当前线程,不能立即返回
apscheduler.schedulers.background
- 适用于调度程序在应用程序的后台运行,调用 start 后主线程不会阻塞
apscheduler.schedulers.asyncio
- 适用于使用了 Asyncio 模块的应用程序
apscheduler.schedulers.gevent
- 适用于使用 Gevent 模块的应用程序
apscheduler.schedulers.tornado
- 适用于构建 Tornado 的应用程序
apscheduler.schedulers.twisted
- 适用于构建 Twisted 的应用程序
apscheduler.triggers.base
apscheduler.triggers.cron
apscheduler.triggers.date
- date 触发器
apscheduler.triggers.interval
- interval 触发器
apscheduler.triggers.combining
- crontab 触发器
4. 调度配置
主要讲解的是任务调度程序(scheduler)的配置和使用
APScheduler
提供了许多不同的方法来配置任务调度程序(scheduler
),这样就可以在任何环境中获得最大的灵活性。你可以使用字典配置;你也可以将选项作为关键字参数传递;你还可以先实例化调度程序,然后添加作业并配置调度程序。以下命令将为你创建一个名为
scheduler
的后台执行的任务调度程序,默认将任务存储在MemoryJobStore
内存中,使用的是ThreadPoolExecutor
线程池且默认最大线程数为10
个。
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
- 下面是三种添加任务调度程序(
scheduler
)配置的三种方式,我推荐使用第一种,因为其更加的直观且易于后续代码的修改的调整,所以其余两种我就不展示代码了。- a MongoDBJobStore named “mongo”
- an SQLAlchemyJobStore named “default” (using SQLite)
- a ThreadPoolExecutor named “default”, with a worker count of 20
- a ProcessPoolExecutor named “processpool”, with a worker count of 5
- UTC as the scheduler’s timezone
- coalescing turned off for new jobs by default
- a default maximum instance limit of 3 for new jobs
from pytz import utc
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = {
'mongo': MongoDBJobStore(),
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
def tick():
print('Tick! The time is: %s' % datetime.now())
if __name__ == '__main__':
scheduler = BackgroundScheduler()
scheduler.configure(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
timezone=utc
)
scheduler.add_executor('processpool')
scheduler.add_job(tick, 'interval', seconds=3) # 添加任务
try:
print('Press Ctrl+C to exit ...')
scheduler.start() # 启动任务
except (KeyboardInterrupt, SystemExit):
pass
5. 使用方法
主要介绍任务的添加、删除、启动、停止等方式
- [1] 启动任务
实例化调度器之后,使用 start()
方法就可以开始跑预定义好的任务了。除了阻塞方式的 BlockingScheduler
模式以外的其他模式,调用之后将理解返回。之后,你可以继续对应用程序进行初始化以及向调度器添加任务。而对于 BlockingScheduler
模式,初始化之后,直接使用 start()
方法即可。
scheduler = BlockingScheduler()
scheduler.add_executor('processpool')
scheduler.add_job(tick, 'interval', seconds=3)
scheduler.start()
- [2] 关闭任务
默认情况下,调度程序关闭其作业存储和执行程序,并等待直到所有当前正在执行的作业完成。如果您不想等待,可以执行以下操作:
scheduler.shutdown()
scheduler.shutdown(wait=False)
- [3] 添加任务
添加任务有两种方式,第一种方式就是最常用 add_job()
方法,第二种方式是使用装饰器模式的 scheduled_job()
方法,用于基本不会变更的任务程序上。
需要额外注意的是,如果在 executor
或 job store
中使用序列化存储的话,那么在你添加任务的时候有些额外要求需要注意。第一是,任务 target
对象必须是全局内可以方法的;第二是,任务 target
的参数必须是可以进行序列化的。
在内置任务存储器中,只有 MemoryJobStore
不会序列化任务。在内置执行程序中,只有 ProcessPoolExecutor
会序列化任务。
scheduler = BlockingScheduler()
scheduler.add_executor('processpool')
scheduler.add_job(tick, 'interval', seconds=3, id='tick')
- [4] 删除任务
从调度程序中删除作业时,它将从其关联的作业存储中删除,并且不再执行。有两种方式可以实现这一目标:第一种方式,就是通过使用 remove_job()
方法删除任务 ID
或者删除任务别名;第二种方式,就是通过使用 add_job()
方法的 remove()
达到删除的目的。
# using an explicit job ID:
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
# use remove() way
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
- [5] 停止和恢复任务
# to pause a job
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.pause_job('my_job_id')
scheduler.pause()
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.pause()
# to resume a job
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.resume_job('my_job_id')
scheduler.resume()
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.resume()
- [6] 修改任务
# scheduler
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
# job
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.modify(max_instances=6, name='Alternate name')
- [7] 获取任务
# 获取任务列表
scheduler.print_jobs()
scheduler.get_jobs()
scheduler.get_job('tick')
- [8] 打印日志
如果调度程序未按预期工作,则将 apscheduler
记录器的日志记录级别提高到 DEBUG
级别会很有帮助。如果您尚未首先启用日志记录,则可以执行以下操作:
import logging
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
6. 实例说明
实例来自官方仓库的目录中,方便阅读
- [1] executors/processpool.py
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
HIT_NUMBER = 0
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(1)
}
def hit():
global HIT_NUMBER
HIT_NUMBER += 1
print(f'Hit {HIT_NUMBER}! The time is {datetime.now()} ...')
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_executor('processpool')
scheduler.configure(executors=executors)
job = scheduler.add_job(hit, 'interval', seconds=1, id='hit')
print('Press Ctrl+C to exit')
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
- [2] jobstores/sqlalchemy.py
import sys
import os
from datetime import datetime, timedelta
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
jobstores = {
'sqlalchemy': SQLAlchemyJobStore(url='sqlite:///sqlalchemy_default.sqlite')
}
def alarm(time):
print('Alarm! This alarm was scheduled at %s.' % time)
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.configure(jobstores=jobstores)
scheduler.add_executor('processpool')
alarm_time = datetime.now() + timedelta(seconds=10)
scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()])
print('Press Ctrl+C to exit')
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
- [3] misc/reference.py
import os
from apscheduler.schedulers.blocking import BlockingScheduler
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_job('sys:stdout.write', 'interval', seconds=3, args=['tick\n'])
print(f'Press Ctrl+C to exit')
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
- [4] rpc/server.py
import rpyc
from rpyc.utils.server import ThreadedServer
from apscheduler.schedulers.background import BackgroundScheduler
def print_text(text):
print(text)
class SchedulerService(rpyc.Service):
def exposed_add_job(self, func, *args, **kwargs):
return scheduler.add_job(func, *args, **kwargs)
def exposed_modify_job(self, job_id, jobstore=None, **changes):
return scheduler.modify_job(job_id, jobstore, **changes)
def exposed_reschedule_job(self, job_id, jobstore=None, trigger=None, **trigger_args):
return scheduler.reschedule_job(job_id, jobstore, trigger, **trigger_args)
def exposed_pause_job(self, job_id, jobstore=None):
return scheduler.pause_job(job_id, jobstore)
def exposed_resume_job(self, job_id, jobstore=None):
return scheduler.resume_job(job_id, jobstore)
def exposed_remove_job(self, job_id, jobstore=None):
scheduler.remove_job(job_id, jobstore)
def exposed_get_job(self, job_id):
return scheduler.get_job(job_id)
def exposed_get_jobs(self, jobstore=None):
return scheduler.get_jobs(jobstore)
if __name__ == '__main__':
scheduler = BackgroundScheduler()
scheduler.start()
protocol_config = {'allow_public_attrs': True}
server = ThreadedServer(SchedulerService, port=12345, protocol_config=protocol_config)
try:
server.start()
except (KeyboardInterrupt, SystemExit):
pass
finally:
scheduler.shutdown()
- [4] rpc/client.py
from time import sleep
import rpyc
conn = rpyc.connect('localhost', 12345)
job = conn.root.add_job('server:print_text', 'interval', args=['Hello, World'], seconds=2)
sleep(10)
conn.root.remove_job(job.id)
- [5] schedulers/blocking.py
from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler
def tick():
print('Tick! The time is: %s' % datetime.now())
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_job(tick, 'interval', seconds=3)
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass