开发者

thread safety in django with asynchronous tasks and redis

开发者 https://www.devze.com 2023-02-22 06:31 出处:网络
I have a django application that calls an asynchronous task on a queryset (using celery). The task takes the queryset and performs a whole bunch of operations that could potentially take a very long t

I have a django application that calls an asynchronous task on a queryset (using celery). The task takes the queryset and performs a whole bunch of operations that could potentially take a very long time based on the obects therein. Objects could be shared across querysets, so a user could submit a task on a queryset that contains objects that are already running, and that new task should should only execute on the objects th开发者_JAVA百科at aren't yet running, but wait for all objects to complete before it returns.

My explanation is a bit confusing, so imagine the following code:

from time import sleep
import redis
from celery.task import Task
from someapp.models import InterestingModel
from someapp.longtime import i_take_a_while

class LongRunningTask(Task):
    def run(self, process_id, *args, **kwargs):
        _queryset = InterestingModel.objects.filter(process__id=process_id)

        r = redis.Redis()
        p = r.pipeline()
        run_check_sets = ('run_check', 'objects_already_running')

        # There must be a better way to do this:
        for o in _queryset.values_list('pk', flat=True):
            p.sadd('run_check')
        p.sdiff(run_check_sets) # Objects that need to be run
        p.sunion(run_check_sets) # Objects that we need to wait for
        p.sunionstore('objects_already_running',run_check_sets)
        p.delete('run_check')
        redis_result = p.execute()

        objects_to_run = redis_result[-3]
        objects_to_wait_for = redis_result[-2]

        if objects_to_run:
            i_take_a_while(objects_to_run)
            p = r.pipeline()
            for o in objects_to_run:
                p.srem('objects_already_running', o)
            p.execute()

        while objects_to_wait_for:
            p = r.pipeline()
            for o in objects_to_wait_for:
                p.sismember('objects_already_running',o)
            redis_result = p.execute()
            objects_to_wait_for = [objects_to_wait_for[i] for i, member in enumerate(redis_result) if member]
            # Probably need to add some sort of timeout here or in redis
            sleep(30) 

I am extremely new to Redis, so my main question is whether there is a more efficient way to manipulate Redis to achieve the same result. More broadly, I wonder if Redis is necessary/the right approach to dealing with this problem. It seems like there should be a better way to interact Django models with Redis. Finally, I wonder if this code is, in fact, thread safe. Can anyone punch any holes in my logic?

Any commentary is appreciated.


Is it possible for you to architect this slightly differently? Specifically, I would kick off the tasks for each object and then store information about your long running jobs somewhere (e.g., database, cache, etc). When each individual object was finished, it would update the long running job info and check to see if all of the jobs had returned. If so, then you can run whatever code needs to be run when the long running task is complete.

This has the advantage of not tying up a thread on your server while you wait for other things to happen. On the client side, you could check the status of the long running job periodically and even use the number of objects complete to update a progress meter if you want.

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号