celery支持不同的调度方式,我这里主要介绍以下4点内容,
1.等待调用结果
2.并行的调用
3.串行的调用
4.并行、串行的混合调用
还有其他的调用方式,可以参考文档
http://docs.celeryproject.org/en/latest/userguide/canvas.html#canvas-map
注意以下的示例为了演示方便都是在一台机器上调度,但是经过测试是可以在多台机器运行的。
0.相关准备工作
如何启动服务,可以参考我的前一篇博客Celery的基础功能介绍
目录结构
celery
celery/proj
celery/proj/celery.py
celery/proj/tasks.py
主要使用这个函数测试(celery\proj\tasks.py中定义)
@app.task
def testsleep(number):
time.sleep(number)
print('sleep:',number)
return number
先启动服务
cd celery
celery -A proj worker -l info
1.等待调用结果
from proj.tasks import testsleep
result = testsleep.delay(30)
result.ready()
result.get() #等待30秒结束后才返回
2.并行的调用
from celery import group
res1 = group(testsleep.s(5),testsleep.s(15),testsleep.s(30))()
res1.get()
结果:
3个函数分别在6、7、8的worker中执行,而且后一个函数和前一个分别间隔10秒、间隔15秒结束,说明确实是并行
3.串行的调用
from celery import chain
res2 = chain(testsleep.s(10),testsleep.s(),testsleep.s())()
res2.get()
监控结果:
都在worker10下执行,
其中要注意函数的写法,前一个返回值是后一个输入,如果写上参数会报错
res2 = chain(testsleep.s(10),testsleep.s(10),testsleep.s(10))()
#会报这些错误
AttributeError: 'AsyncResult' object has no attribute 'type'
4.并行、串行的混合调用
chaincall = chain(testsleep.s(3),testsleep.s())
groupresult = group(testsleep.s(4),testsleep.s(5),chaincall)()
groupresult.get()
注意如果这样写
chaincall = chain(testsleep.s(3),testsleep.s())
groupcall = group(testsleep.s(4),testsleep.s(5),chaincall)
#函数是可以多次调用的
chaincall().get()
groupcall().get()
5.总结
串行或并行实际上都可以通过get函数处理,
这些主要是处理类似这些功能的
group(add.s(i, i) for i in xrange(100))()