1、前言
在日常的开发工作中,当我们的Api中有一个动作需要很长时间才能完成的时候,我们就可以将这个动作作为一个任务交给Celery去异步执行,执行完再将结果返回给用户。在这个过程中Celery就充当了一个任务调度的角色。
以上就是Celery的一个典型使用场景,Celery是一个基于分布式消息的任务队列,支持多种并发方式。
2、快速开始
2.1、创建虚拟环境
mkdir celeryy && cd celeryy
pdm init
2.2、安装Celery
pdm add celery
2.3、安装eventlet(windows)
pip install eventlet
2.4、启动Redis
cd redis
.\redis-server.exe redis.windows.conf
2.5、编写task文件
# celeryy/celery_task.py
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0',backend='redis://localhost:6379/1')
@app.task
def add(x, y):
return x + y
2.6、启动Celery应用
windows需要依赖eventlet模块启动。
cd celeryy
celery -A celery_task worker --loglevel=info -P eventlet
2.7、调用task
# celeryy/run.py
from celery_task import add
import time
t1 = time.time()
r1 = add.delay(1, 2)
r2 = add.delay(2, 4)
r3 = add.delay(3, 6)
r4 = add.delay(4, 8)
r5 = add.delay(5, 10)
r_list = [r1, r2, r3, r4, r5]
for r in r_list:
while not r.ready():
pass
print(r.result)
t2 = time.time()
print('共耗时:%s' % str(t2-t1))
代码执行结果:
E:\celeryy>python run.py
3
6
9
12
15
共耗时:8.324045658111572
3、Celery原理分析
简单来说,Celery就是一个分布式的任务调度和执行器。即当我们有一些耗时的任务,我们可以将这些任务托管给Celery,然后需要调用的时候,直接在业务代码中导入这个被Celery托管的任务,然后利用这些任务自带的delay方法对齐进行调用(生产者),然后这些任务就会被Celery通过broker(Redis等消息代理)通知到Worker(消费者)进行执行,最后任务的执行结果会被保存到backend中去。
需要注意的是,当我们的业务接口A执行了创建了Celery的异步任务的同时,给任务一个默认的状态。后续我们可以根据任务的key去查询任务的状态。
3.1、任务状态查询
上面的代码中,我们使用了ready方法去检查任务状态。但是任务的执行结果都是会被保存到backend中的,我们可以登录到backend中去查看任务的状态。
可以通过delay的返回值对象的id属性去获取当前task的id。
r = add.delay(1,2)
print(r.id)
根据task的id去Redis(backend)中去查询这个task的执行结果:
127.0.0.1:6379[1]> get celery-task-meta-af1f9d6b-73d9-43f9-aa99-35a6918554a4
"{\"status\": \"SUCCESS\", \"result\": 15, \"traceback\": null, \"children\": [], \"date_done\": \"2022-06-15T09:24:48.470345\", \"task_id\": \"af1f9d6b-73d9-43f9-aa99-35a6918554a4\"}"
如此,我们就完整的使用Celery达到了任务异步执行以及结果刷新的目标。
评论区