I am trying to parallelize some work, which runs on my mac (Pyton 3.2.2 under Mac OS 10.7) but gives the following error on a Linux cluster I run it where I got 4 cores and access Python 3.2. The error messages continue until I break execution manually.
Exception in thread Thread-2:
Traceback (most recent call last):
File "/n/sw/python-3.2/lib/python3.2/threading.py", line 736, in _bootstrap_inner
self.run()
File "/n/sw/python-3.2/lib/python3.2/threading.py", line 689, in run
self._target(*self._args, **self._kwargs)
File "/n/sw/python-3.2/lib/python3.2/multiprocessing/pool.py", line 338, in _handle_tasks
put(task)
_pickle.PicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed
Process PoolWorker-2:
Process PoolWorker-4:
Traceback (most recent call last):
Traceback (most recent call last):
File "/n/sw/python-3.2/lib/python3.2/multiprocessing/process.py", line 259, in _bootstrap
File "/n/sw/python-3.2/lib/python3.2/multiprocessing/process.py", line 259, in _bootstrap
Process PoolWorker-1:
Traceback (most recent call last):
File "/n/sw/python-3.2/lib/python3.2/multiprocessing/process.py", line 259, in _bootstrap
Process PoolWorker-12:
Traceback (most recent call last):
File "/n/sw/python-3.2/lib/python3.2/multiprocessing/process.py", line 259, in _bootstrap
self.run()
File "/n/sw/python-3.2/lib/python3.2/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/n/sw/python-3.2/lib/python3.2/multiprocessing/pool.py", line 102, in worker
Process PoolWorker-11:
Traceback (most recent call last):
File "/n/sw/python-3.2/lib/python3.2/multiprocessing/process.py", line 259, in _bootstrap
self.run()
File "/n/sw/python-3.2/lib/python3.2/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/n/sw/python-3.2/lib/python3.2/multiprocessing/pool.py", line 102, in worker
Of course, for reference, here is part of my code. I don't see how this call of multiprocessing.Pool should result in these errors, esp. PoolWorkers with numbers higher than the 4 processes. Thanks for any thoughts!
import csv
import networkx as nx
import time
import shutil
import datetime
import pydot
import os
import re
import logging
from operator import itemgetter
import numpy as np
from multiprocessing import Pool
import itertools
# Dictionary for edge attributes in projected graph:
# 0: overlap_length
# 1: overlap_start
# 2: overlap_end
# 3: cell
# 4: level
def chunks(l,n):
"""Divide a list of nodes `l` in `n` chunks"""
l_c = iter(l)
while 1:
x = tuple(itertools.islice(l_c,n))
if not x:
return
yield x
def overlaps(G,B,u,nbrs2):
l = []
for v in nbrs2:
for mutual_cell in set(B[u]) & set(B[v]):
for uspell in B.get_edge_data(u,mutual_cell).values():
ustart = uspell[1]
uend = uspell[2]
for vspell in B.get_edge_data(v,mutual_cell).values():
vstart = vspell[1]
vend = vspell[2]
if uend > vstart and vend > ustart:
ostart = max(ustart,vstart)
oend = min(uend,vend)
olen = (oend-ostart+1)/86400
ocell = mutual_cell
if (v not in G[u] or ostart not in [ edict[1] for edict in G[u][v].values() ]):
l.append((u,v,{0: olen,1: ostart,2: oend,3: ocell}))
return l
def _pmap1(arg_tuple):
"""Pool for multiprocess only accepts functions with one argument. This function
uses a tuple as its only argument.
"""
return overlaps(arg_tuple[0],arg_tuple[1],arg_tuple[2],arg_tuple[3])
def time_overlap_projected_graph_parallel(B, nodes):
G=nx.MultiGraph()
G.add_nodes_from((n,B.node[n]) for n in nodes)
add_edges_from = nx.MultiGraph.add_edges_from
开发者_开发技巧 get_edge_data = nx.MultiGraph.get_edge_data
p = Pool(processes=4)
node_divisor = len(p._pool)
for u in nodes:
unbrs = set(B[u])
nbrs2 = set((n for nbr in unbrs for n in B[nbr])) - set([u])
# iterate over subsets of neighbors - parallelize
node_chunks = list(chunks(nbrs2,int(len(nbrs2)/int(node_divisor))))
num_chunks = len(node_chunks)
pedgelists = p.map(_pmap1,
zip([G]*num_chunks,
[B]*num_chunks,
[u]*num_chunks,
node_chunks))
ll = []
for l in pedgelists:
ll.extend(l)
G.add_edges_from(ll)
# compile long list
# add edges from long list in a single step
return G
OK, I was "inadvertently" trying to cProfile the parallel run on the cluster, while I was simply having test runs offline. The code runs fine, but profiling breaks down -- as it always should for parallel scripts. It is not related to the cluster or LSF. Sorry.
精彩评论