APScheduler定时任务工具


APScheduler 即可以单独使用,也可以用于常见的 Web 框架。

高级 Python 调度程序(APScheduler)一个 Python 库,可让您安排稍后执行的 Python 代码,可以是一次,也可以是定期执行。您可以根据需要随时添加新作业或删除旧作业。如果将作业存储在数据库中,它们也将在调度程序重新启动后继续运行并保持其状态。重新启动调度程序后,它将运行它应该在脱机时运行的所有作业。

定时任务APScheduler工具


1. 安装工具

APScheduler 工具的安装方式原来如此简单

  • 安装操作
# 直接安装即可
$ pip install apscheduler
# 下载安装即可
$ https://pypi.python.org/pypi/APScheduler/
$ python setup.py install
  • 简单使用
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler

def tick(text=None):
    print(f"Tick! The time is: {datetime.now()} - {text}")

if __name__ == '__main__':
    scheduler = BackgroundScheduler(timezone="Asia/Shanghai")

    # 使用date触发器
    scheduler.add_job(tick, 'date', run_date=date(2009, 11, 6), args=['hello'])
    # 使用interval触发器
    scheduler.add_job(tick, 'interval', hours=2, seconds=3)
    # 使用crontab触发器
    scheduler.add_job(tick, 'cron', hours=2, seconds=3)

    try:
        print('Press Ctrl+C to exit ...')
        scheduler.start()  # 启动任务
    except (KeyboardInterrupt, SystemExit):
        pass

2. 基本组件

基本组件分为: triggers | job stores | executors | schedulers

  • triggers(触发器)

触发器(triggers)提供的是 scheduling 的调度逻辑,并且每个 job 都有属于自己 triggers ,用于管理任务何时开始执行。除了在初始配置之外,triggers 完全是无状态的。

  • job stores(任务存储器)

任务存储器(job stores)主要包含的是 scheduling 的调度任务。默认情况下,job stores 会将任务存储在内存中,当然你也可以将任务存储在各种各样类型的数据库中,达到持久存储的效果。存储的时候会对数据进行序列化,再被加载的时候进行反序列化。除了默认情况下,job stores 不会将 job data 存储在内存中,而是充当中间人,用于在后端保存、加载、更新和搜索任务。job stores 绝对不会在 schedulers 之间共享数据的。

  • executors(执行器)

执行器(executors)是处理 job 任务如何运行的,通常将 job 指定的可调用对象提交给线程池或进程池来完成此操作。作业完成后,执行器会通知 scheduler 调度程序,调度程序随后会发出相应的事件。

  • schedulers(任务调度器)

任务调度器(schedulers)的作用就是将各个组件给调度起来。通常,你的应用程序中只会运行一个 schedulers。应用程序开发人员通常不直接处理任务存储(job stores),执行器(executors)和触发器(triggers)。相反,任务调度器(schedulers)提供了适当的接口来处理所有这些。配置 job storesexecutors 是通过 schedulers 完成的,添加、修改和删除作业也是如此。


3. 正确选择

Choosing the right scheduler, job store(s), executor(s) and trigger(s)

APScheduler定时任务工具


4. 调度配置

主要讲解的是任务调度程序(scheduler)的配置和使用

  • APScheduler 提供了许多不同的方法来配置任务调度程序(scheduler),这样就可以在任何环境中获得最大的灵活性。你可以使用字典配置;你也可以将选项作为关键字参数传递;你还可以先实例化调度程序,然后添加作业并配置调度程序。

  • 以下命令将为你创建一个名为 scheduler 的后台执行的任务调度程序,默认将任务存储在 MemoryJobStore 内存中,使用的是 ThreadPoolExecutor 线程池且默认最大线程数为 10 个。

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()
  • 下面是三种添加任务调度程序(scheduler)配置的三种方式,我推荐使用第一种,因为其更加的直观且易于后续代码的修改的调整,所以其余两种我就不展示代码了。
    • a MongoDBJobStore named “mongo”
    • an SQLAlchemyJobStore named “default” (using SQLite)
    • a ThreadPoolExecutor named “default”, with a worker count of 20
    • a ProcessPoolExecutor named “processpool”, with a worker count of 5
    • UTC as the scheduler’s timezone
    • coalescing turned off for new jobs by default
    • a default maximum instance limit of 3 for new jobs
from pytz import utc
from datetime import datetime

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor


jobstores = {
    'mongo': MongoDBJobStore(),
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}


def tick():
    print('Tick! The time is: %s' % datetime.now())


if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    scheduler.configure(
        jobstores=jobstores,
        executors=executors,
        job_defaults=job_defaults,
        timezone=utc
    )
    scheduler.add_executor('processpool')
    scheduler.add_job(tick, 'interval', seconds=3)  # 添加任务

    try:
        print('Press Ctrl+C to exit ...')
        scheduler.start()  # 启动任务
    except (KeyboardInterrupt, SystemExit):
        pass

5. 使用方法

主要介绍任务的添加、删除、启动、停止等方式

APScheduler定时任务工具

  • [1] 启动任务

实例化调度器之后,使用 start() 方法就可以开始跑预定义好的任务了。除了阻塞方式的 BlockingScheduler 模式以外的其他模式,调用之后将理解返回。之后,你可以继续对应用程序进行初始化以及向调度器添加任务。而对于 BlockingScheduler 模式,初始化之后,直接使用 start() 方法即可。

scheduler = BlockingScheduler()
scheduler.add_executor('processpool')
scheduler.add_job(tick, 'interval', seconds=3)
scheduler.start()
  • [2] 关闭任务

默认情况下,调度程序关闭其作业存储和执行程序,并等待直到所有当前正在执行的作业完成。如果您不想等待,可以执行以下操作:

scheduler.shutdown()
scheduler.shutdown(wait=False)
  • [3] 添加任务

添加任务有两种方式,第一种方式就是最常用 add_job() 方法,第二种方式是使用装饰器模式的 scheduled_job() 方法,用于基本不会变更的任务程序上。

需要额外注意的是,如果在 executorjob store 中使用序列化存储的话,那么在你添加任务的时候有些额外要求需要注意。第一是,任务 target 对象必须是全局内可以方法的;第二是,任务 target 的参数必须是可以进行序列化的。

在内置任务存储器中,只有 MemoryJobStore 不会序列化任务。在内置执行程序中,只有 ProcessPoolExecutor 会序列化任务。

scheduler = BlockingScheduler()
scheduler.add_executor('processpool')
scheduler.add_job(tick, 'interval', seconds=3, id='tick')
  • [4] 删除任务

从调度程序中删除作业时,它将从其关联的作业存储中删除,并且不再执行。有两种方式可以实现这一目标:第一种方式,就是通过使用 remove_job() 方法删除任务 ID 或者删除任务别名;第二种方式,就是通过使用 add_job() 方法的 remove() 达到删除的目的。

# using an explicit job ID:
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')

# use remove() way
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
  • [5] 停止和恢复任务
# to pause a job
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.pause_job('my_job_id')
scheduler.pause()

job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.pause()
# to resume a job
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.resume_job('my_job_id')
scheduler.resume()

job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.resume()
  • [6] 修改任务
# scheduler
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

# job
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.modify(max_instances=6, name='Alternate name')
  • [7] 获取任务
# 获取任务列表
scheduler.print_jobs()
scheduler.get_jobs()
scheduler.get_job('tick')
  • [8] 打印日志

如果调度程序未按预期工作,则将 apscheduler 记录器的日志记录级别提高到 DEBUG 级别会很有帮助。如果您尚未首先启用日志记录,则可以执行以下操作:

import logging

logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)

6. 实例说明

实例来自官方仓库的目录中,方便阅读

  • [1] executors/processpool.py
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

HIT_NUMBER = 0

executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(1)
}

def hit():
    global HIT_NUMBER
    HIT_NUMBER += 1
    print(f'Hit {HIT_NUMBER}! The time is {datetime.now()} ...')


if __name__ == '__main__':
    scheduler = BlockingScheduler()
    scheduler.add_executor('processpool')
    scheduler.configure(executors=executors)
    job = scheduler.add_job(hit, 'interval', seconds=1, id='hit')

    print('Press Ctrl+C to exit')
    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()
  • [2] jobstores/sqlalchemy.py
import sys
import os
from datetime import datetime, timedelta

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

jobstores = {
    'sqlalchemy': SQLAlchemyJobStore(url='sqlite:///sqlalchemy_default.sqlite')
}


def alarm(time):
    print('Alarm! This alarm was scheduled at %s.' % time)


if __name__ == '__main__':
    scheduler = BlockingScheduler()
    scheduler.configure(jobstores=jobstores)
    scheduler.add_executor('processpool')
    alarm_time = datetime.now() + timedelta(seconds=10)
    scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()])

    print('Press Ctrl+C to exit')
    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()
  • [3] misc/reference.py
import os

from apscheduler.schedulers.blocking import BlockingScheduler


if __name__ == '__main__':
    scheduler = BlockingScheduler()
    scheduler.add_job('sys:stdout.write', 'interval', seconds=3, args=['tick\n'])
    print(f'Press Ctrl+C to exit')

    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        pass
  • [4] rpc/server.py
import rpyc
from rpyc.utils.server import ThreadedServer

from apscheduler.schedulers.background import BackgroundScheduler


def print_text(text):
    print(text)


class SchedulerService(rpyc.Service):
    def exposed_add_job(self, func, *args, **kwargs):
        return scheduler.add_job(func, *args, **kwargs)

    def exposed_modify_job(self, job_id, jobstore=None, **changes):
        return scheduler.modify_job(job_id, jobstore, **changes)

    def exposed_reschedule_job(self, job_id, jobstore=None, trigger=None, **trigger_args):
        return scheduler.reschedule_job(job_id, jobstore, trigger, **trigger_args)

    def exposed_pause_job(self, job_id, jobstore=None):
        return scheduler.pause_job(job_id, jobstore)

    def exposed_resume_job(self, job_id, jobstore=None):
        return scheduler.resume_job(job_id, jobstore)

    def exposed_remove_job(self, job_id, jobstore=None):
        scheduler.remove_job(job_id, jobstore)

    def exposed_get_job(self, job_id):
        return scheduler.get_job(job_id)

    def exposed_get_jobs(self, jobstore=None):
        return scheduler.get_jobs(jobstore)


if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    scheduler.start()
    protocol_config = {'allow_public_attrs': True}
    server = ThreadedServer(SchedulerService, port=12345, protocol_config=protocol_config)
    try:
        server.start()
    except (KeyboardInterrupt, SystemExit):
        pass
    finally:
        scheduler.shutdown()
  • [4] rpc/client.py
from time import sleep

import rpyc


conn = rpyc.connect('localhost', 12345)
job = conn.root.add_job('server:print_text', 'interval', args=['Hello, World'], seconds=2)
sleep(10)
conn.root.remove_job(job.id)
  • [5] schedulers/blocking.py
from datetime import datetime
import os

from apscheduler.schedulers.blocking import BlockingScheduler


def tick():
    print('Tick! The time is: %s' % datetime.now())


if __name__ == '__main__':
    scheduler = BlockingScheduler()
    scheduler.add_job(tick, 'interval', seconds=3)
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        pass

文章作者: Escape
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Escape !
  目录