Python的分布式调度工具Celery--流程调度功能介绍

浏览: 3031

celery支持不同的调度方式,我这里主要介绍以下4点内容,

1.等待调用结果

2.并行的调用

3.串行的调用

4.并行、串行的混合调用

还有其他的调用方式,可以参考文档

http://docs.celeryproject.org/en/latest/userguide/canvas.html#canvas-map


注意以下的示例为了演示方便都是在一台机器上调度,但是经过测试是可以在多台机器运行的。

0.相关准备工作

如何启动服务,可以参考我的前一篇博客Celery的基础功能介绍

目录结构

celery
celery/proj
celery/proj/celery.py
celery/proj/tasks.py

主要使用这个函数测试(celery\proj\tasks.py中定义)

@app.task
def testsleep(number):
    time.sleep(number)
    print('sleep:',number)
    return number

先启动服务

cd celery
celery -A proj worker -l info

1.等待调用结果

from proj.tasks import testsleep
result = testsleep.delay(30)
result.ready()
result.get()  #等待30秒结束后才返回


2.并行的调用

from celery import group
res1 = group(testsleep.s(5),testsleep.s(15),testsleep.s(30))()
res1.get()

结果:


3个函数分别在6、7、8的worker中执行,而且后一个函数和前一个分别间隔10秒、间隔15秒结束,说明确实是并行

3.串行的调用

from celery import chain
res2 = chain(testsleep.s(10),testsleep.s(),testsleep.s())()
res2.get()

监控结果:


都在worker10下执行,

其中要注意函数的写法,前一个返回值是后一个输入,如果写上参数会报错

res2 = chain(testsleep.s(10),testsleep.s(10),testsleep.s(10))()
#会报这些错误
AttributeError: 'AsyncResult' object has no attribute 'type'

4.并行、串行的混合调用

chaincall = chain(testsleep.s(3),testsleep.s())
groupresult = group(testsleep.s(4),testsleep.s(5),chaincall)()
groupresult.get()

注意如果这样写

chaincall = chain(testsleep.s(3),testsleep.s())
groupcall = group(testsleep.s(4),testsleep.s(5),chaincall)
#函数是可以多次调用的
chaincall().get()
groupcall().get()

5.总结

串行或并行实际上都可以通过get函数处理,

这些主要是处理类似这些功能的

group(add.s(i, i) for i in xrange(100))()

推荐 1
本文由 seng 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册