Celery多任务PG卡死问题


纸上得来终觉浅,绝知此事要躬行。

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。它是一个专注于实时处理的任务队列,同时也支持任务调度。

Celery多任务PG卡死问题


1. 报错表象

日志虽然删除了敏感信息,但不影响问题的排除!

有一个测试环境发生报错,发现服务重启之后,可以正常使用一小段时间。但是,后续使用的话,就发现无法假死,不响应了。下面,是我排除日志发现的,异常表象。

  • [1] 执行 Celery 命令初始化 worker 任务
    • 命令:celery -A app.worker.task worker -c 4 -Q celery
# ==> 生成并初始化四个进程 <==
[11:53:44,396 MainProcess] basic.qos: prefetch_count->4
[11:53:44,396 MainProcess] to doing somethings ...
  • [2] 第一批取任务
    • 第一次四个进程都可以正常执行完成任务
    • 主进程收到任务 -> 从任务池中取到任务 -> 主进程接收任务 -> Fork 的子进程执行任务 -> 任务执行完成
# ==> 第一个进程执行队列任务(第一次执行) - 执行完成 <==
[11:54:27,435 MainProcess] Received task: worker.tasks.task-1 [771c4ae8-54bc-4686-be2a-3d34abf9143b]
[11:54:27,436 MainProcess] TaskPool: Apply <function _app_run_task at  0x7fd8f000048> ()
[11:54:27,437 MainProcess] Task accepted: worker.tasks.task-1 [771c4ae8-54bc-4686-be2a-3d34abf9143b] pid:1179
[11:54:27,442 ForkPoolWorker-1] connecting to database: host=postgres port=5432 dbname=app user=postgres password=app
[11:54:27,504 ForkPoolWorker-1] to doing somethings ...
[11:54:29,882 ForkPoolWorker-1] Task worker.tasks.task-1 [771c4ae8-54bc-4686-be2a-3d34abf9143b] succeeded

# ==> 第二个进程执行队列任务(第一次执行) - 执行完成 <==
[11:54:29,792 MainProcess] Received task: worker.tasks.task-2 [01faa7b3-914b-4977-8f4c-d902942ea5f5]
[11:54:29,794 MainProcess] TaskPool: Apply <function _app_run_task at  0x7fd8f000048> ()
[11:54:29,795 MainProcess] Task accepted: worker.tasks.task-2 [01faa7b3-914b-4977-8f4c-d902942ea5f5] pid:1180
[11:54:29,799 ForkPoolWorker-2] connecting to database: host=postgres port=5432 dbname=app user=postgres password=app
[11:54:29,827 ForkPoolWorker-2] to doing somethings ...
[11:54:31,863 ForkPoolWorker-2] Task worker.tasks.task-2 [01faa7b3-914b-4977-8f4c-d902942ea5f5] succeeded

# ==> 第三个进程执行队列任务(第一次执行) - 执行完成 <==
[11:54:29,867 MainProcess] Received task: worker.tasks.task-3 [15388fbe-02fe-4773-adfd-97e4f6183eb7]
[11:54:29,869 MainProcess] TaskPool: Apply <function _app_run_task at  0x7fd8f000048> ()
[11:54:29,872 MainProcess] Task accepted: worker.tasks.task-3 [02db9241-b0d9-4858-9769-3c4663a683e4] pid:1181
[11:54:29,875 ForkPoolWorker-3] connecting to database: host=postgres port=5432 dbname=app user=postgres password=app
[11:54:29,902 ForkPoolWorker-3] to doing somethings ...
[11:54:32,002 ForkPoolWorker-3] Task worker.tasks.task-3 [02db9241-b0d9-4858-9769-3c4663a683e4] succeeded

# ==> 第四个进程执行队列任务(第一次执行) - 执行完成 <==
[11:54:29,870 MainProcess] Received task: worker.tasks.task-4 [02db9241-b0d9-4858-9769-3c4663a683e4]
[11:54:29,870 MainProcess] TaskPool: Apply <function _app_run_task at  0x7fd8f000048> ()
[11:54:29,872 MainProcess] Task accepted: worker.tasks.task-4 [15388fbe-02fe-4773-adfd-97e4f6183eb7] pid:1182
[11:54:29,875 ForkPoolWorker-4] connecting to database: host=postgres port=5432 dbname=app user=postgres password=app
[11:54:29,974 ForkPoolWorker-4] to doing somethings ...
[11:54:35,016 ForkPoolWorker-4] Task worker.tasks.task-4 [15388fbe-02fe-4773-adfd-97e4f6183eb7] succeeded
  • [3] 第二批取任务
    • 第二次四个进程都执行到初始化数据库的时候给卡住了
    • 主进程收到任务 -> 从任务池中取到任务 -> 主进程接收任务 -> 初始化数据库 -> 卡住
# ==> 第一个进程执行队列任务(第二次执行) - 初始化数据库卡住 <==
[11:54:29,871 MainProcess] Received task: worker.tasks.task-5 [c16d753b-0d17-47a1-93a7-dadc02f04624]
[11:54:29,883 MainProcess] TaskPool: Apply <function _app_run_task at  0x7fd8f000048> ()
[11:54:29,884 MainProcess] Task accepted: worker.tasks.task-5 [c16d753b-0d17-47a1-93a7-dadc02f04624] pid:1179
[11:54:29,886 ForkPoolWorker-1] connecting to database: host=postgres port=5432 dbname=app user=postgres password=app

# ==> 第二个进程执行队列任务(第二次执行) - 初始化数据库卡住 <==
[11:59:22,979 MainProcess] Received task: worker.tasks.task-8 [6d6c89a9-3ac0-490b-8372-2af415992e99]
[11:59:22,980 MainProcess] TaskPool: Apply <function _app_run_task at  0x7fd8f000048> ()
[11:59:22,981 MainProcess] Task accepted: worker.tasks.task-8 [6d6c89a9-3ac0-490b-8372-2af415992e99] pid:1180
[11:59:22,982 ForkPoolWorker-2] connecting to database: host=postgres port=5432 dbname=app user=postgres password=app

# ==> 第三个进程执行队列任务(第二次执行) - 初始化数据库卡住 <==
[11:58:46,059 MainProcess] Received task: worker.tasks.task-7 [67ae30d2-29f0-4f9c-beb4-de5777783dba]
[11:58:46,060 MainProcess] TaskPool: Apply <function _app_run_task at  0x7fd8f000048> ()
[11:58:46,061 MainProcess] Task accepted: worker.tasks.task-7 [67ae30d2-29f0-4f9c-beb4-de5777783dba] pid:1181
[11:58:46,063 ForkPoolWorker-3] connecting to database: host=postgres port=5432 dbname=app user=postgres password=app

# ==> 第四个进程执行队列任务(第二次执行) - 初始化数据库卡住 <==
[11:54:38,314 MainProcess] Received task: worker.tasks.task-6 [1e99f261-9456-41cc-b403-7bee0924a607]
[11:54:38,315 MainProcess] TaskPool: Apply <function _app_run_task at  0x7fd8f000048> ()
[11:54:38,315 MainProcess] Task accepted: worker.tasks.task-6 [1e99f261-9456-41cc-b403-7bee0924a607] pid:1182
[11:54:38,316 ForkPoolWorker-4] connecting to database: host=postgres port=5432 dbname=app user=postgres password=app
  • [4] 第三批取任务
    • 第三次四个进程都直接卡死
    • 主进程收到任务 -> 卡住
# ==> 四个进程第三次执行的时候直接卡死 <==
[12:00:29,708 MainProcess] Received task: worker.tasks.task-9 [47518ef0-c831-4acb-a59d-a2e04e8eb984]

2. 问题排除

celery -A app.worker.task worker -c 4 -Q celery

随即看了下,这个 celeryworker 到底干了什么事情。下面是大致的代码,可以看到就是简简单单的初始化了一个 Celery 的对象、配置了下,然后就开始执行 Redis 里面的队列任务以及定时任务等。看着都挺正常的,并没有什么使用或者配置不对的情况,看来并不是这里的原因。

  • 任务执行时会调用数据库
from celery import Celery

app = Celery("xxx", broker="", include="")
default_queue = {'queue': "celery"}

app.conf.update(
    timezone='Asia/Shanghai',
    enable_utc=True,
    worker_prefetch_multiplier=1,
      ......
)

if __name__ == '__main__':
    app.start()

因为跑得 worker 任务需要连接数据库进行操作,所以看下下面这个 db.pool 的初始化操作,可以看到是通过 momoko 这个第三方库进行连接的,并且 Web 框架使用的是 Tornado。再结合上述日志的情况,可以看到,直接原因就是 IOloop 卡住了,新来的任务没办法获取到运行权,卡住了。

  • 连接数据库初始化
import logging

import momoko
from tornado.ioloop import IOLoop
from app import config


worker_db_conn = None

def init_db(ioloop=None, sync=False, max_size=1, **kwargs):
    if worker_db_conn is not None:
        return worker_db_conn

    ioloop = ioloop or IOLoop.current()
    dsn = " ".join("{}={}".format(key, value) for key, value in config.get_config("db").items() if value)
    logging.debug("connecting to database: %s", dsn)
    _db = momoko.Pool(
        dsn=dsn,
        size=1,
        max_size=max_size,
        auto_shrink=True,
        ioloop=ioloop,
        ......
    )

    if sync:
        ioloop.run_sync(_db.connect)
    else:
        _db.connect()
    return _db

其实,排除了很久都没有发现,是什么原因导致出现上述问题的。加之,该项目使用的 Web 框架 Tornado 近期也做了升级,随即考虑排除下使用的第三方库来看看是不是因为这个原因引起的。后来翻看发现了 momoko 这个第三方库,在 Tornado5.x 以后不再兼容 nomoko 了,导致 worker 执行完一次任务后就卡死。但是替换这个第三方库一时半会还有些麻烦,所以这里在 Celery 中临时加了下面这个参数配置,即执行一次后就销毁 worker 执行,从而间接的避免队列卡死的情况。

  • 临时的处理方式
    • Tornado 4.x -> Tornado 5.x
    • asyncio -> yield from -> async/await
app.conf.update(
    # 执行若干次任务后销毁worker
    worker_max_tasks_per_child=1,
      ......
)

到这里问题已经基本清楚了,但是上面的解决方案还是很鸡肋,所以要想彻底进行处理,就是不再使用 momoko 这个库,而是使用其他的方式或者库进行替代。但是,这里还是要感慨一下,开源软件的痛点 “后期社区不活跃 —> 软件不再维护 —> 废弃” 的这一过程。

  • 永久的处理方式 - 下面是 momoko 这个库的现状

Celery多任务PG卡死问题

Celery多任务PG卡死问题


3. 写在最后

思考问题与总结教训,防止后续再次发生!

到这里,其实已经不用在纠结这个问题真实原因了,需要思考的就是:在以后新建项目进行技术选型的时候,一定要找那些 经得起时间考虑 的第三方依赖和框架,不然等它们跑路了,可能你已经积重难返,奈何需要自己来 fork 一份代码,后续自行进行后续开发和维护工作。我们都知道,这将是一件非常痛苦的事情。


文章作者: Escape
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Escape !
  目录