I have a process, that needs to perform a bunch of actions "later" (after 10-60 seconds usually). The problem is that those "later" actions can be a lot (1000s), so using a Thread
per task is not viable. I know for the existence of tools like gevent and eventlet, but one of the problem is that the process uses zeromq for communication so I would need some integration (eventlet already has it).
What I'm wondering is What are my options? So, suggestions are welcome, in the lines of libraries (if you've used any of the mentioned please share your experiences), techniques (Python's "coroutine" support, use one thread that sleeps for a while and checks a qu开发者_如何学Pythoneue), how to make use of zeromq's poll or eventloop to do the job, or something else.
consider using a priority queue with one or more worker threads to service the tasks. The main thread can add work to the queue, with a timestamp of the soonest it should be serviced. Worker threads pop work off the queue, sleep until the time of priority value is reached, do the work, and then pop another item off the queue.
How about a more fleshed out answer. mklauber makes a good point. If there's a chance all of your workers might be sleeping when you have new, more urgent work, then a queue.PriorityQueue
isn't really the solution, although a "priority queue" is still the technique to use, which is available from the heapq
module. Instead, we'll make use of a different synchronization primitive; a condition variable, which in python is spelled threading.Condition
.
The approach is fairly simple, peek on the heap, and if the work is current, pop it off and do that work. If there was work, but it's scheduled into the future, just wait on the condition until then, or if there's no work at all, sleep forever.
The producer does it's fair share of the work; every time it adds new work, it notifies the condition, so if there are sleeping workers, they'll wake up and recheck the queue for newer work.
import heapq, time, threading
START_TIME = time.time()
SERIALIZE_STDOUT = threading.Lock()
def consumer(message):
"""the actual work function. nevermind the locks here, this just keeps
the output nicely formatted. a real work function probably won't need
it, or might need quite different synchronization"""
SERIALIZE_STDOUT.acquire()
print time.time() - START_TIME, message
SERIALIZE_STDOUT.release()
def produce(work_queue, condition, timeout, message):
"""called to put a single item onto the work queue."""
prio = time.time() + float(timeout)
condition.acquire()
heapq.heappush(work_queue, (prio, message))
condition.notify()
condition.release()
def worker(work_queue, condition):
condition.acquire()
stopped = False
while not stopped:
now = time.time()
if work_queue:
prio, data = work_queue[0]
if data == 'stop':
stopped = True
continue
if prio < now:
heapq.heappop(work_queue)
condition.release()
# do some work!
consumer(data)
condition.acquire()
else:
condition.wait(prio - now)
else:
# the queue is empty, wait until notified
condition.wait()
condition.release()
if __name__ == '__main__':
# first set up the work queue and worker pool
work_queue = []
cond = threading.Condition()
pool = [threading.Thread(target=worker, args=(work_queue, cond))
for _ignored in range(4)]
map(threading.Thread.start, pool)
# now add some work
produce(work_queue, cond, 10, 'Grumpy')
produce(work_queue, cond, 10, 'Sneezy')
produce(work_queue, cond, 5, 'Happy')
produce(work_queue, cond, 10, 'Dopey')
produce(work_queue, cond, 15, 'Bashful')
time.sleep(5)
produce(work_queue, cond, 5, 'Sleepy')
produce(work_queue, cond, 10, 'Doc')
# and just to make the example a bit more friendly, tell the threads to stop after all
# the work is done
produce(work_queue, cond, float('inf'), 'stop')
map(threading.Thread.join, pool)
This answer has actually two suggestions - my first one and another I have discovered after the first one.
sched
I suspect you are looking for the sched
module.
EDIT: my bare suggestion seemed little helpful after I have read it. So I decided to test the sched
module to see if it can work as I suggested. Here comes my test: I would use it with a sole thread, more or less this way:
class SchedulingThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.scheduler = sched.scheduler(time.time, time.sleep)
self.queue = []
self.queue_lock = threading.Lock()
self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())
def run(self):
self.scheduler.run()
def schedule(self, function, delay):
with self.queue_lock:
self.queue.append((delay, 1, function, ()))
def _schedule_in_scheduler(self):
with self.queue_lock:
for event in self.queue:
self.scheduler.enter(*event)
print "Registerd event", event
self.queue = []
self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())
First, I'd create a thread class which would have its own scheduler and a queue. At least one event would be registered in the scheduler: one for invoking a method for scheduling events from the queue.
class SchedulingThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.scheduler = sched.scheduler(time.time, time.sleep)
self.queue = []
self.queue_lock = threading.Lock()
self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())
The method for scheduling events from the queue would lock the queue, schedule each event, empty the queue and schedule itself again, for looking for new events some time in the future. Note that the period for looking for new events is short (one second), you may change it:
def _schedule_in_scheduler(self):
with self.queue_lock:
for event in self.queue:
self.scheduler.enter(*event)
print "Registerd event", event
self.queue = []
self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())
The class should also have a method for scheduling user events. Naturally, this method should lock the queue while updating it:
def schedule(self, function, delay):
with self.queue_lock:
self.queue.append((delay, 1, function, ()))
Finally, the class should invoke the scheduler main method:
def run(self):
self.scheduler.run()
Here comes an example of using:
def print_time():
print "scheduled:", time.time()
if __name__ == "__main__":
st = SchedulingThread()
st.start()
st.schedule(print_time, 10)
while True:
print "main thread:", time.time()
time.sleep(5)
st.join()
Its output in my machine is:
$ python schedthread.py
main thread: 1311089765.77
Registerd event (10, 1, <function print_time at 0x2f4bb0>, ())
main thread: 1311089770.77
main thread: 1311089775.77
scheduled: 1311089776.77
main thread: 1311089780.77
main thread: 1311089785.77
This code is just a quick'n'dirty example, it may need some work. However, I have to confess that I am a bit fascinated by the sched
module, so did I suggest it. You may want to look for other suggestions as well :)
APScheduler
Looking in Google for solutions like the one I've post, I found this amazing APScheduler module. It is so practical and useful that I bet it is your solution. My previous example would be way simpler with this module:
from apscheduler.scheduler import Scheduler
import time
sch = Scheduler()
sch.start()
@sch.interval_schedule(seconds=10)
def print_time():
print "scheduled:", time.time()
sch.unschedule_func(print_time)
while True:
print "main thread:", time.time()
time.sleep(5)
(Unfortunately I did not find how to schedule an event to execute only once, so the function event should unschedule itself. I bet it can be solved with some decorator.)
If you have a bunch of tasks that need to get performed later, and you want them to persist even if you shut down the calling program or your workers, you should really look into Celery, which makes it super easy to create new tasks, have them executed on any machine you'd like, and wait for the results.
From the Celery page, "This is a simple task adding two numbers:"
from celery.task import task
@task
def add(x, y):
return x + y
You can execute the task in the background, or wait for it to finish:
>>> result = add.delay(8, 8)
>>> result.wait() # wait for and return the result
16
You wrote:
one of the problem is that the process uses zeromq for communication so I would need some integration (eventlet already has it)
Seems like your choice will be heavily influenced by these details, which are a bit unclear—how is zeromq being used for communication, how much resources will the integration will require, and what are your requirements and available resources.
There's a project called django-ztask which uses zeromq
and provides a task
decorator similar to celery's one. However, it is (obviously) Django-specific and so may not be suitable in your case. I haven't used it, prefer celery myself.
Been using celery for a couple of projects (these are hosted at ep.io PaaS hosting, which provides an easy way to use it).
Celery looks like quite flexible solution, allowing delaying tasks, callbacks, task expiration & retrying, limiting task execution rate, etc. It may be used with Redis, Beanstalk, CouchDB, MongoDB or an SQL database.
Example code (definition of task and asynchronous execution after a delay):
from celery.decorators import task
@task
def my_task(arg1, arg2):
pass # Do something
result = my_task.apply_async(
args=[sth1, sth2], # Arguments that will be passed to `my_task()` function.
countdown=3, # Time in seconds to wait before queueing the task.
)
See also a section in celery docs.
Have you looked at the multiprocessing
module? It comes standard with Python. It is similar to the threading
module, but runs each task in a process. You can use a Pool()
object to set up a worker pool, then use the .map()
method to call a function with the various queued task arguments.
Pyzmq has an ioloop
implementation with a similar api to that of the tornado ioloop. It implements a DelayedCallback
which may help you.
Presuming your process has a run loop which can receive signals and the length of time of each action is within bounds of sequential operation, use signals and posix alarm()
signal.alarm(time)
If time is non-zero, this function requests that a
SIGALRM signal be sent to the process in time seconds.
This depends on what you mean by "those "later" actions can be a lot" and if your process already uses signals. Due to phrasing of the question it's unclear why an external python package would be needed.
Another option is to use the Phyton GLib bindings, in particular its timeout
functions.
It's a good choice as long as you don't want to make use of multiple cores and as long as the dependency on GLib is no problem. It handles all events in the same thread which prevents synchronization issues. Additionally, its event framework can also be used to watch and handle IO-based (i.e. sockets) events.
UPDATE:
Here's a live session using GLib:
>>> import time
>>> import glib
>>>
>>> def workon(thing):
... print("%s: working on %s" % (time.time(), thing))
... return True # use True for repetitive and False for one-time tasks
...
>>> ml = glib.MainLoop()
>>>
>>> glib.timeout_add(1000, workon, "this")
2
>>> glib.timeout_add(2000, workon, "that")
3
>>>
>>> ml.run()
1311343177.61: working on this
1311343178.61: working on that
1311343178.61: working on this
1311343179.61: working on this
1311343180.61: working on this
1311343180.61: working on that
1311343181.61: working on this
1311343182.61: working on this
1311343182.61: working on that
1311343183.61: working on this
Well in my opinion you could use something called "cooperative multitasking". It's twisted-based thing and its really cool. Just look at PyCon presentation from 2010: http://blip.tv/pycon-us-videos-2009-2010-2011/pycon-2010-cooperative-multitasking-with-twisted-getting-things-done-concurrently-11-3352182
Well you will need transport queue to do this too...
Simple. You can inherit your class from Thread and create instance of your class with Param like timeout so for each instance of your class you can say timeout that will make your thread wait for that time
精彩评论