Celery重复执行同一个任务


纸上得来终觉浅,绝知此事要躬行。

Celery重复执行同一个任务


1. 问题起因

该问题遇到过两次,可以暂时不解决。

我们在项目中使用 Redis 作为中间件 Broker 来使用 Celery 的时候,发现有小概率会发生 Celery 会将同一个任务重复执行两遍。即相同的任务在不同的 worker 中被分别执行,并且时间只相差十几毫秒。这样会导致任务被重复执行影响执行效率,并且容易出现 Bug,比如两个任务同时创建一个文件就会报错等。 一直以为是我们自己内部使用的方式不对导致的,但是看到网上有很多人遇到了相同的问题,官方提及预计在 4.54.4.x 的版本中进行修复。

# 项目中使用版本
celery==4.3.0  # BSD
redis==3.2.1   # MIT

Celery重复执行同一个任务


2. 解决方法

不急的话,等等官方的解决方法吧,这样不会引入额外依赖。

在没有得到官方的解决之前,现在解决改问题的方法主要就是需要自己实现了一个锁来防止任务的多次分发。查了查,发现网上已经有人做了相关的实现了,那就是 celery-once 项目,而且用的人还比较多的。

# requirement celery 4.0+
pip install -U celery_once

其实 celery-once 也是利用 Redis 加锁来实现的,它的使用非常简单,参照 GitHub 中的使用介绍就可以上手了。celery-onceTask 类基础上实现了 QueueOnce 类,该类提供了任务去重的功能。

在使用之前,需要在 Celery 配置文件中配置一些选项才能正常工作。

from celery import Celery
from celery_once import QueueOnce
from time import sleep

celery = Celery('tasks', broker='amqp://guest@localhost//')
celery.conf.ONCE = {
  'backend': 'celery_once.backends.Redis',
  'settings': {
    'url': 'redis://localhost:6379/0',
    'default_timeout': 60 * 60
  }
}

@celery.task(base=QueueOnce)
def slow_task():
    sleep(30)
    return "Done!"

在使用时,我们自己实现的 task 方法需要继承 QueueOnce 作为抽象基类,后面的 once 参数表示在遇到重复方法时的处理方式。默认 gracefulFalse,表示 Celery 会抛出 AlreadyQueued 异常,手动设置为 True,则会静默处理。

@task(base=QueueOnce, once={'graceful': True, keys': ['a']})
def slow_add(a, b):
    sleep(30)
    return a + b

在运行任务时,celery_once 会先检查是否存在锁。 如果没有锁,则任务将正常运行。 一旦任务完成或由于异常而终止,锁将清除。 如果尝试在任务完成之前再次运行该任务,则会引发 AlreadyQueued 异常。

example.delay(10)
example.delay(10)
Traceback (most recent call last):
    ..
AlreadyQueued()

默认情况下,celery_once 基于任务的名称及其参数和值创建一个锁。如果,使用不同的参数运行任务将默认检查不同的锁。

slow_add(1, 1)
slow_add(1, 2)

3. 相关链接

授人玫瑰,手有余香!

官方仓库地址

相关讨论问题


文章作者: Escape
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Escape !
  目录