纸上得来终觉浅,绝知此事要躬行。
1. 问题起因
该问题遇到过两次,可以暂时不解决。
我们在项目中使用 Redis
作为中间件 Broker
来使用 Celery
的时候,发现有小概率会发生 Celery
会将同一个任务重复执行两遍。即相同的任务在不同的 worker
中被分别执行,并且时间只相差十几毫秒。这样会导致任务被重复执行影响执行效率,并且容易出现 Bug
,比如两个任务同时创建一个文件就会报错等。 一直以为是我们自己内部使用的方式不对导致的,但是看到网上有很多人遇到了相同的问题,官方提及预计在 4.5
和 4.4.x
的版本中进行修复。
# 项目中使用版本
celery==4.3.0 # BSD
redis==3.2.1 # MIT
2. 解决方法
不急的话,等等官方的解决方法吧,这样不会引入额外依赖。
在没有得到官方的解决之前,现在解决改问题的方法主要就是需要自己实现了一个锁来防止任务的多次分发。查了查,发现网上已经有人做了相关的实现了,那就是 celery-once
项目,而且用的人还比较多的。
# requirement celery 4.0+
pip install -U celery_once
其实 celery-once
也是利用 Redis
加锁来实现的,它的使用非常简单,参照 GitHub
中的使用介绍就可以上手了。celery-once
在 Task
类基础上实现了 QueueOnce
类,该类提供了任务去重的功能。
在使用之前,需要在 Celery
配置文件中配置一些选项才能正常工作。
from celery import Celery
from celery_once import QueueOnce
from time import sleep
celery = Celery('tasks', broker='amqp://guest@localhost//')
celery.conf.ONCE = {
'backend': 'celery_once.backends.Redis',
'settings': {
'url': 'redis://localhost:6379/0',
'default_timeout': 60 * 60
}
}
@celery.task(base=QueueOnce)
def slow_task():
sleep(30)
return "Done!"
在使用时,我们自己实现的 task
方法需要继承 QueueOnce
作为抽象基类,后面的 once
参数表示在遇到重复方法时的处理方式。默认 graceful
为 False
,表示 Celery
会抛出 AlreadyQueued
异常,手动设置为 True
,则会静默处理。
@task(base=QueueOnce, once={'graceful': True, keys': ['a']})
def slow_add(a, b):
sleep(30)
return a + b
在运行任务时,celery_once
会先检查是否存在锁。 如果没有锁,则任务将正常运行。 一旦任务完成或由于异常而终止,锁将清除。 如果尝试在任务完成之前再次运行该任务,则会引发 AlreadyQueued
异常。
example.delay(10)
example.delay(10)
Traceback (most recent call last):
..
AlreadyQueued()
默认情况下,celery_once
基于任务的名称及其参数和值创建一个锁。如果,使用不同的参数运行任务将默认检查不同的锁。
slow_add(1, 1)
slow_add(1, 2)
3. 相关链接
授人玫瑰,手有余香!
官方仓库地址
相关讨论问题