开发者

How to manage python threads results?

开发者 https://www.devze.com 2023-01-07 04:49 出处:网络
I am using this code: def startThreads(arrayofkeywords): global i i = 0 while len(arrayofkeywords): try: if i<maxThreads:

I am using this code:

def startThreads(arrayofkeywords):
    global i
    i = 0
    while len(arrayofkeywords):
        try:
            if i<maxThreads:
                keyword = arra开发者_运维知识库yofkeywords.pop(0)
                i = i+1
                thread = doStuffWith(keyword)
                thread.start()
        except KeyboardInterrupt:
            sys.exit()
    thread.join()

for threading in python, I have almost everything done, but I dont know how to manage the results of each thread, on each thread I have an array of strings as result, how can I join all those arrays into one safely? Because, I if I try writing into a global array, two threads could be writing at the same time.


First, you actually need to save all those thread objects to call join() on them. As written, you're saving only the last one of them, and then only if there isn't an exception.

An easy way to do multithreaded programming is to give each thread all the data it needs to run, and then have it not write to anything outside that working set. If all threads follow that guideline, their writes will not interfere with each other. Then, once a thread has finished, have the main thread only aggregate the results into a global array. This is know as "fork/join parallelism."

If you subclass the Thread object, you can give it space to store that return value without interfering with other threads. Then you can do something like this:

class MyThread(threading.Thread):
    def __init__(self, ...):
        self.result = []
        ...

def main():
    # doStuffWith() returns a MyThread instance
    threads = [ doStuffWith(k).start() for k in arrayofkeywords[:maxThreads] ]
    for t in threads:
        t.join()
        ret = t.result
        # process return value here

Edit:

After looking around a bit, it seems like the above method isn't the preferred way to do threads in Python. The above is more of a Java-esque pattern for threads. Instead you could do something like:

def handler(outList)
    ...
    # Modify existing object (important!)
    outList.append(1)
    ...

def doStuffWith(keyword):
    ...
    result = []
    thread = Thread(target=handler, args=(result,))
    return (thread, result)

def main():
    threads = [ doStuffWith(k) for k in arrayofkeywords[:maxThreads] ]
    for t in threads:
        t[0].start()
    for t in threads:
        t[0].join()
        ret = t[1]
        # process return value here


Use a Queue.Queue instance, which is intrinsically thread-safe. Each thread can .put its results to that global instance when it's done, and the main thread (when it knows all working threads are done, by .joining them for example as in @unholysampler's answer) can loop .getting each result from it, and use each result to .extend the "overall result" list, until the queue is emptied.

Edit: there are other big problems with your code -- if the maximum number of threads is less than the number of keywords, it will never terminate (you're trying to start a thread per keyword -- never less -- but if you've already started the max numbers you loop forever to no further purpose).

Consider instead using a threading pool, kind of like the one in this recipe, except that in lieu of queueing callables you'll queue the keywords -- since the callable you want to run in the thread is the same in each thread, just varying the argument. Of course that callable will be changed to peel something from the incoming-tasks queue (with .get) and .put the list of results to the outgoing-results queue when done.

To terminate the N threads you could, after all keywords, .put N "sentinels" (e.g. None, assuming no keyword can be None): a thread's callable will exit if the "keyword" it just pulled is None.

More often than not, Queue.Queue offers the best way to organize threading (and multiprocessing!) architectures in Python, be they generic like in the recipe I pointed you to, or more specialized like I'm suggesting for your use case in the last two paragraphs.


You need to keep pointers to each thread you make. As is, your code only ensures the last created thread finishes. This does not imply that all the ones you started before it have also finished.

def startThreads(arrayofkeywords):
    global i
    i = 0
    threads = []
    while len(arrayofkeywords):
        try:
            if i<maxThreads:
                keyword = arrayofkeywords.pop(0)
                i = i+1
                thread = doStuffWith(keyword)
                thread.start()
                threads.append(thread)
        except KeyboardInterrupt:
            sys.exit()
    for t in threads:
        t.join()
    //process results stored in each thread

This also solves the problem of write access because each thread will store it's data locally. Then after all of them are done, you can do the work to combine each threads local data.


I know that this question is a little bit old, but the best way to do this is not to harm yourself too much in the way proposed by other colleagues :)

Please read the reference on Pool. This way you will fork-join your work:

def doStuffWith(keyword):
    return keyword + ' processed in thread'

def startThreads(arrayofkeywords):
    pool = Pool(processes=maxThreads)
    result = pool.map(doStuffWith, arrayofkeywords)
    print result


Writing into a global array is fine if you use a semaphore to protect the critical section. You 'acquire' the lock when you want to append to the global array, then 'release' when you are done. This way, only one thread is every appending to the array.

Check out http://docs.python.org/library/threading.html and search for semaphore for more info.

sem = threading.Semaphore()
...
sem.acquire()
# do dangerous stuff
sem.release()


try some semaphore's methods, like acquire and release.. http://docs.python.org/library/threading.html

0

精彩评论

暂无评论...
验证码 换一张
取 消