I'm working on a subclass of threading.Thread
which allows its methods to be called and run in the thread represented by the object that they are called on as opposed to the usual behavior. I do this by using decorators on the target method that place the call to the method in a collections.deque
and using the run
method to process the deque.
the run
method uses a while not self.__stop:
statement and a threading.Condition
object to wait for a call to be placed in the deque and then call self.__process_calls
. The else
part of the while
loop makes a final call to __process_calls
. if self.__stop
, an exception is raised on any attempts to call one of the 'callable' methods from another thread.
The problem is that __process_calls
fails to return unless the last statement is a print
which I discovered during debugging. I've tried a = 1
and an explicit return
but neither work. with any print
statement as the final statement of the function though, it returns and the thread doesn't hang. Any ideas what's going on?
EDIT: It was pointed out by David Zaslavsky that the print works because it takes a while and I've confirmed that
The code's a little long but hopefully, my explanation above is clear enough to help understand it.
import threading
import collections
class BrokenPromise(Exception): pass
class CallableThreadError(Exception): pass
class CallToNonRunningThreadError(CallableThreadError): pass
class Promise(object):
def __init__(self, deque, condition):
self._condition = condition
self._deque = deque
def read(self, timeout=None):
if not self._deque:
with self._condition:
if timeout:
self._condition.wait(timeout)
else:
self._condition.wait()
if self._deque:
value = self._deque.popleft()
del self._deque
del self._condition
return value
else:
raise BrokenPromise
def ready(self):
return bool(self._deque)
class CallableThread(threading.Thread):
def __init__(self, *args, **kwargs):
# _enqueued_calls is used to store tuples that encode a function call.
# It is processed by the run method
self.__enqueued_calls = collections.deque()
# _e开发者_开发百科nqueue_call_permission is for callers to signal that they have
# placed something on the queue
self.__enqueue_call_permission = threading.Condition()
self.__stop = False
super(CallableThread, self).__init__(*args, **kwargs)
@staticmethod
def blocking_method(f):
u"""A decorator function to implement a blocking method on a thread"""
# the returned function enqueues the decorated function and blocks
# until the decorated function# is called and returns. It then returns
# the value unmodified. The code in register runs in the calling thread
# and the decorated method runs in thread that it is called on
f = CallableThread.nonblocking_method_with_promise(f)
def register(self, *args, **kwargs):
p = f(self, *args, **kwargs)
return p.read()
return register
@staticmethod
def nonblocking_method_with_promise(f):
u"""A decorator function to implement a non-blocking method on a
thread
"""
# the returned function enqueues the decorated function and returns a
# Promise object.N The code in register runs in the calling thread
# and the decorated method runs in thread that it is called on.
def register(self, *args, **kwargs):
call_complete = threading.Condition()
response_deque = collections.deque()
self.__push_call(f, args, kwargs, response_deque, call_complete)
return Promise(response_deque, call_complete)
return register
@staticmethod
def nonblocking_method(f):
def register(self, *args, **kwargs):
self.__push_call(f, args, kwargs)
return register
def run(self):
while not self.__stop: # while we've not been killed
with self.__enqueue_call_permission:
# get the condition so that we can wait on it if we need too.
if not self.__enqueued_calls:
self.__enqueue_call_permission.wait()
self.__process_calls()
else:
# if we exit because self._run == False, finish processing
# the pending calls if there are any
self.__process_calls()
def stop(self):
u""" Signal the thread to stop"""
with self.__enqueue_call_permission:
# we do this in case the run method is stuck waiting on an update
self.__stop = True
self.__enqueue_call_permission.notify()
def __process_calls(self):
print "processing calls"
while self.__enqueued_calls:
((f, args, kwargs),
response_deque, call_complete) = self.__enqueued_calls.popleft()
if call_complete:
with call_complete:
response_deque.append(f(self, *args, **kwargs))
call_complete.notify()
else:
f(self, *args, **kwargs)
# this is where you place the print statement if you want to see the
# behavior
def __push_call(self, f, args, kwargs, response_deque=None,
call_complete=None):
if self.__stop:
raise CallToNonRunningThreadError(
"This thread is no longer accepting calls")
with self.__enqueue_call_permission:
self.__enqueued_calls.append(((f, args, kwargs),
response_deque, call_complete))
self.__enqueue_call_permission.notify()
#if __name__=='__main__': i lost the indent on the following code in copying but
#it doesn't matter in this context
class TestThread(CallableThread):
u"""Increment a counter on each call and print the value"""
counter = 0
@CallableThread.nonblocking_method_with_promise
def increment(self):
self.counter += 1
return self.counter
class LogThread(CallableThread):
@CallableThread.nonblocking_method
def log(self, message):
print message
l = LogThread()
l.start()
l.log("logger started")
t = TestThread()
t.start()
l.log("test thread started")
p = t.increment()
l.log("promise aquired")
v = p.read()
l.log("promise read")
l.log("{0} read from promise".format(v))
l.stop()
t.stop()
l.join()
t.join()
__process_calls
is modifying__enqueued_calls
without owning the lock. This may be creating a race condition.Edit: deque may be "threadsafe" (ie not corrupted by thread accesses), but the checking of its state still should be locked.
The stop condition is also not safe.
Comments inline:
def run(self):
while not self.__stop: # while we've not been killed
with self.__enqueue_call_permission:
# get the condition so that we can wait on it if we need too.
### should be checking __stop here, it could have been modified before
### you took the lock.
if not self.__enqueued_calls:
self.__enqueue_call_permission.wait()
self.__process_calls()
else:
# if we exit because self._run == False, finish processing
# the pending calls if there are any
self.__process_calls()
精彩评论