I've tried both the multiprocessing included in the python2.6 Ubuntu
package (__version__
says 0.70a1) and the latest from PyPI (2.6.2.1).
In both cases I don't know how to use imap correctly - it causes the
entire interpreter to stop responding to ctrl-C's (map works fine though). pdb shows next()
is hanging开发者_开发百科 on the condition variable wait()
call in IMapIterator
, so nobody is waking us up. Any hints? Thanks
in advance.
$ cat /tmp/go3.py
import multiprocessing as mp
print mp.Pool(1).map(abs, range(3))
print list(mp.Pool(1).imap(abs, range(3)))
$ python /tmp/go3.py
[0, 1, 2]
^C^C^C^C^C^\Quit
First notice that this works:
import multiprocessing as mp
import multiprocessing.util as util
pool=mp.Pool(1)
print list(pool.imap(abs, range(3)))
The difference is that pool
does not get finalized when the call to pool.imap()
ends.
In contrast,
print(list(mp.Pool(1).imap(abs, range(3))))
causes the Pool
instance to be finalized soon after the imap
call ends.
The lack of a reference causes the Finalizer
(called self._terminate
in the Pool
class) to be called. This sets in motion a sequence of commands which tears down the task handler thread, result handler thread, worker subprocesses, etc.
This all happens so quickly, that at least on a majority of runs, the task sent to the task handler does not complete.
Here are the relevant bits of code:
From /usr/lib/python2.6/multiprocessing/pool.py:
class Pool(object):
def __init__(self, processes=None, initializer=None, initargs=()):
...
self._terminate = Finalize(
self, self._terminate_pool,
args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
self._task_handler, self._result_handler, self._cache),
exitpriority=15
)
/usr/lib/python2.6/multiprocessing/util.py:
class Finalize(object):
'''
Class which supports object finalization using weakrefs
'''
def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
...
if obj is not None:
self._weakref = weakref.ref(obj, self)
The weakref.ref(obj,self)
causes self()
to be called when obj
is about to be finalized.
I used the debug command util.log_to_stderr(util.SUBDEBUG)
to learn the sequence of events. For example:
import multiprocessing as mp
import multiprocessing.util as util
util.log_to_stderr(util.SUBDEBUG)
print(list(mp.Pool(1).imap(abs, range(3))))
yields
[DEBUG/MainProcess] created semlock with handle 3077013504
[DEBUG/MainProcess] created semlock with handle 3077009408
[DEBUG/MainProcess] created semlock with handle 3077005312
[DEBUG/MainProcess] created semlock with handle 3077001216
[INFO/PoolWorker-1] child process calling self.run()
[SUBDEBUG/MainProcess] finalizer calling <bound method type._terminate_pool of <class 'multiprocessing.pool.Pool'>> with args (<Queue.Queue instance at 0x9d6e62c>, <multiprocessing.queues.SimpleQueue object at 0x9cf04cc>, <multiprocessing.queues.SimpleQueue object at 0x9d6e40c>, [<Process(PoolWorker-1, started daemon)>], <Thread(Thread-1, started daemon -1217967248)>, <Thread(Thread-2, started daemon -1226359952)>, {0: <multiprocessing.pool.IMapIterator object at 0x9d6eaec>}) and kwargs {}
[DEBUG/MainProcess] finalizing pool
...
and compare that with
import multiprocessing as mp
import multiprocessing.util as util
util.log_to_stderr(util.SUBDEBUG)
pool=mp.Pool(1)
print list(pool.imap(abs, range(3)))
which yields
[DEBUG/MainProcess] created semlock with handle 3078684672
[DEBUG/MainProcess] created semlock with handle 3078680576
[DEBUG/MainProcess] created semlock with handle 3078676480
[DEBUG/MainProcess] created semlock with handle 3078672384
[INFO/PoolWorker-1] child process calling self.run()
[DEBUG/MainProcess] doing set_length()
[0, 1, 2]
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[SUBDEBUG/MainProcess] calling <Finalize object, callback=_terminate_pool, args=(<Queue.Queue instance at 0xb763e60c>, <multiprocessing.queues.SimpleQueue object at 0xb76c94ac>, <multiprocessing.queues.SimpleQueue object at 0xb763e3ec>, [<Process(PoolWorker-1, started daemon)>], <Thread(Thread-1, started daemon -1218274448)>, <Thread(Thread-2, started daemon -1226667152)>, {}), exitprority=15>
...
[DEBUG/MainProcess] finalizing pool
In my case, I was calling the pool.imap()
without expecting a return value and not getting it to work. However, if I tried it with pool.map()
it worked fine. The issue was exactly as the previous answer stated: there was no finalizer called, so the process was effectively dumped before it was started.
The solution was to evoke a finalizer such as a list()
function. This caused it to work correctly, since it now requires fulfillment to be handed to the list function, and thus the process was executed. In brief, it is explained below (this is, of course, simplified. For now, just pretend it's something useful):
from multiprocessing import Pool
from shutil import copy
from tqdm import tqdm
filedict = { r"C:\src\file1.txt": r"C:\trg\file1_fixed.txt",
r"C:\src\file2.txt": r"C:\trg\file2_fixed.txt",
r"C:\src\file3.txt": r"C:\trg\file3_fixed.txt",
r"C:\src\file4.txt": r"C:\trg\file4_fixed.txt" }
# target process
def copyfile(srctrg):
copy(srctrg[0],srctrg[1])
return True
# a couple of trial processes for illustration
with Pool(2) as pool:
# works fine with map, but cannot utilize tqdm() since no iterator object is returned
pool.map(copyfile,list(filedict.items()))
# will not work, since no finalizer is called for imap
tqdm(pool.imap(copyfile,list(filedict.items()))) # NOT WORKING
# this works, since the finalization is forced for the process
list(tqdm(pool.imap(copyfile,list(filedict.items()))))
In my case, the simple solution was to enclose the entire tqdm(pool.imap(...))
in a list()
in order to force the execution.
精彩评论