I've created a script that by default creates one multiprocessing Process; then it works fine. When starting multiple processes, it starts to hang, and not always in the same place. The program's about 700 lines of code, so I'll try to summarise what's going on. I want to make the most of my multi-cores, by parallelising the slowest task, which is aligning DNA sequences. For that I use the subprocess module to call a command-line program: 'hmmsearch', which I can feed in sequences through /dev/stdin, and then I read out the aligned sequences through /dev/stdout. I imagine the hang occurs because of these multiple subprocess instances reading / writing from stdout / stdin, and I really don't know the best way to go about this... I was looking into os.fdopen(...) & os.tmpfile(), to create temporary filehandles or pipes where I can flush the data through. However, I've never used either before & I can't picture how to do that with the subprocess module. Ideally I'd like to bypass using the hard-drive entirely, because pipes are much better with high-throughput data processing! Any help with this would be super wonderful!!
import multiprocessing, subprocess
from Bio import SeqIO
class align_seq( multiprocessing.Process ):
def __init__( self, inPipe, outPipe, semaphore, options ):
multiprocessing.Process.__init__(self)
self.in_pipe = inPipe ## Sequences in
self.out_pipe = outPipe ## Alignment out
self.options = options.copy() ## Modifiable sub-environment
self.sem = semaphore
def run(self):
inp = self.in_pipe.recv()
while inp != 'STOP':
seq_record , HMM = inp # seq_record is only ever one Bio.Seq.SeqRecord object at a time.
# HMM is a file location.
align_process = subprocess.Popen( ['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE )
self.sem.acquire()
align_process.stdin.write( seq_record.format('fasta') )
align_process.stdin.close()
for seq in SeqIO.parse( align_process.stdout, 'stockholm' ): # get the alignment output
self.out_pipe.send_bytes( seq.seq.tostring() ) # send it to consumer
align_process.wait() # Don't know if there's any need for this??
self.sem.release()
align_process.stdout.close()
inp = self.in_pipe.recv()
self.in_pipe.close() #Close handles so don't overshoot max. limit on number of file-handles.
self.out_pipe.close()
Having spent a while debugging this, I've found a problem that was always there and isn't quite solved yet, but have fixed some other inefficiencies in the process (of debugging). There are two initial feeder functions, this align_seq class and a file parser parseHMM() which loads a position specific scoring matrix (PSM) into a dictionary. The main parent process then compares the alignment to the PSM, using a dictionary (of dictionaries) as a pointer to the relevant score for each residue. In order to calculate the scores I want I have two separate multiprocessing.Process classes, one class logScore() that calculates the log odds ratio (with math.exp() ); I parallelise this one; and it Queues the calculated scores to the last Process, sumScore() which just sums these scores (with math.fsum), returning the sum and all position specific scores back to the parent process as a dictionary. i.e. Queue.put( [sum, { residue position : position specific score , ... } ] ) I find this exceptionally confusing to get my head around (too many queue's!), so I hope that readers are managing to follow... After all the above calculations are done, I then give the option to save the cumulative scores as tab-delimited output. This is where it now (since last night) sometimes breaks, as I ensure it prints out a score for every position where there should be a score. I think that due to latency (computer timings being out-of-sync), sometimes what gets put in the Queue first for logScore doesn't reach sumScore first. In order that sumScore knows when to return the tally and start again, I put 'endSEQ' into the Queue for the last logScore process that performed a calculation. I thought that then it should reach sumScore last too, but that's not always the case; only sometimes does it break. So now I don't get a deadlock anymore, but instead a KeyError when printing or saving the results. I believe the reason for sometimes getting KeyError is because I create a Queue for each logScore process, but that instead they should all use the same Queue. Now, where I have something like:-
class logScore( multiprocessing.Process ):
def __init__( self, inQ, outQ ):
self.inQ = inQ
...
def scoreSequence( processes, HMMPSM, sequenceInPipe ):
process_index = -1
sequence = sequenceInPipe.recv_bytes()
for residue in sequence:
.... ## Get the residue score.
process_index += 1
processes[process_index].inQ.put( residue_score )
## End of sequence
processes[process_index].inQ.put( 'endSEQ' )
logScore_to_sumScoreQ = multiprocessing.Queue()
logScoreProcesses = [ logScore( multiprocessing.Queue() , logSco开发者_开发知识库re_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )
whereas I should create just one Queue to share between all the logScore instances. i.e.
logScore_to_sumScoreQ = multiprocessing.Queue()
scoreSeq_to_logScore = multiprocessing.Queue()
logScoreProcesses = [ logScore( scoreSeq_to_logScore , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )
That's not quite how pipelining works... but to put your mind to ease, here's an excerpt from the subprocess documentation:
stdin, stdout and stderr specify the executed programs’ standard input, standard output and standard error file handles, respectively. Valid values are PIPE, an existing file descriptor (a positive integer), an existing file object, and None. PIPE indicates that a new pipe to the child should be created. With None, no redirection will occur; the child’s file handles will be inherited from the parent.
The likeliest areas at fault would be in the communication with the main process or in your management of the semaphore. Maybe state transitions / synchronization are not proceeding as expected due to a bug? I suggest debugging by adding logging/print statements before & after each blocking call - where you're communicating with the main process and where you acquire/release the semaphore to narrow down where things have gone wrong.
Also I'm curious - is the semaphore absolutely necessary?
I also wanted to parallelize simple tasks and for that I created a little python script. You can take a look at: http://bioinf.comav.upv.es/psubprocess/index.html
Is a little more general than what you want, but for simple tasks is quite easy to use. It might be at least of some insparation to you.
Jose Blanca
It could be a deadlock in subprocess, have you tried using the communicate method rather than wait? http://docs.python.org/library/subprocess.html
精彩评论