开发者

Celery and routing

开发者 https://www.devze.com 2023-04-10 15:14 出处:网络
I need to run some tasks on the specific celeryd instance. So I configured queues: celeryconfig.py: CELERY_QUEUES = {

I need to run some tasks on the specific celeryd instance. So I configured queues:

celeryconfig.py:

CELERY_QUEUES = {
    'celery': {
        'exchange': 'celery',
        'binding_key': 'celery',
    },
    'import': {
        'exchange': 'import',
        'binding_key': 'import.products',
    },
}

CELERY_ROUTES = {
    'celery_tasks.import_tasks.test': {
        'queue': 'import',
        'routing_key': 'import.products',
    },
}

import_tasks.py:

@task
def test():
    print 'test'

@task(exc开发者_Python百科hange='import', routing_key='import.products')
def test2
    print 'test2'

then I start celeryd:

celeryd -c 2 -l INFO -Q import

And try to execute that tasks. 'test' executes but 'test2' do not. But I don't want to specify every importing task in the CELERY_ROUTES. How can I specify which queue should execute task in the task definition?


Oh, forgot to say that I've used send_task function to execute tasks. And this function doesn't import tasks. It just sends the name of the task to the queue.

So instead of this:

from celery.execute import send_task

result = send_task(args.task, task_args, task_kwargs)

I wrote:

from celery import current_app as celery_app, registry as celery_registry

celery_imports = celery_app.conf.get('CELERY_IMPORTS')
if celery_imports:
    for module in celery_imports:
        __import__(module)

task = celery_registry.tasks.get(args.task)
if task:
    result = task.apply_async(task_args, task_kwargs)


See Roman's solution -- http://www.imankulov.name/posts/celery-for-internal-api.html -- to access tasks by name, but also with ability to specify queues and whatnot as if you imported the task module.


I found solution that almost satisfied me:

class CustomRouter(object):
    def route_for_task(self, task, args=None, kwargs=None):
        if task.startswith('celery_tasks.import_tasks'):
            return {'exchange': 'import',
                    'routing_key': 'import.products'}

CELERY_ROUTES = (
    CustomRouter(),
)

Problem is that now I can't use names for tasks.

0

精彩评论

暂无评论...
验证码 换一张
取 消