Python的分布式调度工具Celery--queue实现按机器执行

浏览: 6676

某些情况下,某些任务需要在指定的一些机器上运行,

Celery是通过Exchanges, queues and routing keys这3个概念实现

这是官方文档的解释

Exchanges, queues and routing keys.
    Messages are sent to exchanges.
    An exchange routes messages to one or more queues. Several exchange types exists, providing different ways to do routing, or implementing different messaging scenarios.
    The message waits in the queue until someone consumes it.
    The message is deleted from the queue when it has been acknowledged.

原文见:http://docs.celeryproject.org/en/latest/userguide/routing.html?highlight=exchange_type#producers-consumers-and-brokers


我的理解时,Celery的不同worker是可以处理不同的queue的(也可以是routing_keyg或者2者结合),而quene是可以分配到机器的:

task->queue->worker


celery的代码示例:

通过MyRouter定义了不同的task到queue的分配方式

from __future__ import absolute_import
from celery import Celery
from kombu import Exchange, Queue
app = Celery('proj',
             broker='amqp://guest@bipython',
             backend='amqp://',
             include=['proj.tasks'])
#修改CELERY_QUEUES和MyRouter函数指定分配
class MyRouter(object):
    def route_for_task(self, task, args=None, kwargs=None):
        if task == 'proj.tasks.bipython_testsleep':
            return {
                'queue': 'bipython'
            }
        elif task == 'proj.tasks.sengtest_testsleep':
            return {
                "queue": "sengtest"
            }
        return None
default_exchange = Exchange('default', type='direct')
host_exchange = Exchange('host', type='direct')
     
# Optional configuration, see the application user guide.
app.conf.update(
    CELERY_QUEUES = (
        Queue('default', default_exchange, routing_key='default'),
        Queue('bipython', host_exchange, routing_key='host.bipython'),
        Queue('sengtest', host_exchange, routing_key='host.sengtest'),
    ),
    CELERY_DEFAULT_QUEUE = 'default',
    CELERY_DEFAULT_EXCHANGE = 'default',
    CELERY_DEFAULT_ROUTING_KEY = 'default',
    CELERY_TASK_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
    CELERY_RESULT_SERIALIZER='json',
    CELERY_TASK_RESULT_EXPIRES=3600,
    CELERY_TIMEZONE='Asia/Shanghai',
    CELERY_ROUTES = (MyRouter(), ),
)
if __name__ == '__main__':
    app.start()

服务启动的命令:

通过-Q参数定义对应的queue

#host:bipython
celery -A proj worker -l info -Q bipython
#host:sengtest
celery -A proj worker -l info -Q sengtest


测试代码:

测试了直接调用和并行功能:

from proj.tasks import testsleep
from proj.tasks import sengtest_testsleep
from proj.tasks import bipython_testsleep
result2 = bipython_testsleep.delay(11)
result1 = sengtest_testsleep.delay(9)
result2 = bipython_testsleep.delay(11)
result1 = sengtest_testsleep.delay(9)
from celery import group
groupresult = group(sengtest_testsleep.s(9),bipython_testsleep.s(11))()
groupresult.get()

具体代码截图


测试结果:

bipython结果:

sengtest结果:


消息队列的检查命令:

在最初的测试过程中,无法确认是否发到消息队列,使用了一下命令检查,必要时可以使用

celery -A proj amqp

#查看消息队列的数量

basic.get default 

推荐 2
本文由 seng 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册