Celery简介

官方文档:https://docs.celeryproject.org/en/stable/
Celery是一个简单,灵活且可靠的分布式系统,可以处理大量消息,同时为操作提供维护该系统所需的工具。
这是一个任务队列,着重于实时处理,同时还支持任务调度。

功能

  • 消息队列

    • 异步任务
    • 定时任务

    使用所需了解的知识

  • 选择并安装消息容器(载体)
  • 安装Celery,创建任务
  • 开启工作进程并且调用任务
  • 记录工作状态和返回工作结果/异常

Celery演示

文档:https://docs.celeryproject.org/en/stable/getting-started/first-steps-with-celery.html

安装消息容器

选择消息容器,可选择RabbitMQ,或者redis,还有其他的容器。

这里选择Redis作为实验。

安装python模块:

pip install -U "celery[redis]"

安装Celery

pip install celery==3.1.25

创建应用任务

新建tasks.py:

from celery import Celery
# 创建celery对象,tasks名字,broker指消息载体。
app = Celery('tasks', broker='redis://localhost:6379/1')

@app.task  # 使用celery对象做成装饰器,再调用对象的task属性,装饰add函数
def add(a, b):
    sleep(5)  # 假装该任务很复杂,很耗时
    return a+b

运行celery worker server

celery -A tasks worker --loglevel=INFO

log级别是info

补充:log级别

  • info
  • debug
  • warning

    • 警告
  • error

    • 错误
  • critical

    • 严重错误,会影响其他模块工作

window10运行注意

需要添加额外的参数运行,否则任务不会执行,只会放在队列中。

方法1

添加--pool=solo参数

celery -A tasks worker --loglevel=INFO --pool=solo

方法2

首先安装gevent模块,然后在启动celery的时候添加gevent参数

pip install gevent
celery -A tasks worker --loglevel=INFO -P gevent

运行返回信息:

(venv) E:\djstudy\day15\RESTend\doc\code>celery -A tasks worker --loglevel=INFO --pool=solo

 -------------- celery@DESKTOP-MOK5PHF v5.0.5 (singularity)
--- ***** -----
-- ******* ---- Windows-10-10.0.18362-SP0 2021-02-06 19:11:24
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x47c3db0
- ** ---------- .> transport:   redis://localhost:6379/1
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 12 (solo)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.add

[2021-02-06 19:11:24,323: INFO/MainProcess] Connected to redis://localhost:6379/1
[2021-02-06 19:11:24,330: INFO/MainProcess] mingle: searching for neighbors
[2021-02-06 19:11:25,352: INFO/MainProcess] mingle: all alone
[2021-02-06 19:11:25,364: INFO/MainProcess] celery@DESKTOP-MOK5PHF ready.

调用异步任务

原生写法

在tasks.py中调用add函数:

if __name__ == '__main__':
    print(add(4,6))

然后运行文件,发现5秒后打印结果10。

但是实际生产环境中,发送邮件不需要等待发送完毕才能进行下一步,论文查重不需要一直守着等待结果出来,而是返回提示3小时后来查看结果。

异步写法

if __name__ == '__main__':
    # print(add(4,6))
    print(add.delay(4, 6))

运行结果:


结果返回是一串uuid唯一标识

回到Terminate处,经过5秒可以看到终端打印出了结果:

[2021-02-06 19:21:28,841: INFO/MainProcess] Received task: tasks.add[c4583294-f969-44c8-9374-a1ec97e1c841]
[2021-02-06 19:21:33,843: INFO/MainProcess] Task tasks.add[c4583294-f969-44c8-9374-a1ec97e1c841] succeeded in 5.0s: 10

之后如果再来查询结果的时候可以直接查找id号即可,以上就是最简单的异步任务。

最后修改:2024 年 03 月 13 日
如果觉得我的文章对你有用,请随意赞赏