开发者

Why is there no speed-up when using pythons multiprocessing for embarassingly parallel problem within a for-loop, with shared numpy data?

开发者 https://www.devze.com 2023-01-29 08:54 出处:网络
I want to speed up an embarassingly parallel problem related to Bayesian Inference. The aim is to infer coefficents u for a set of images x, given a matrix A, such that X = A*U.

I want to speed up an embarassingly parallel problem related to Bayesian Inference. The aim is to infer coefficents u for a set of images x, given a matrix A, such that X = A*U. X has dimensions mxn, A mxp and U pxn. For each column of X, one has to infer the optimal corresponding column of the coefficients U. In the end, this information is used to update A. I use m = 3000, p = 1500 and n = 100. So, as it is a linear model, the inference of the coefficient-matrix u consists of n independent calculations. Thus, I tried to work with the multiprocessing module of Python, but there is no speed up.

Here is what I did:

The main structure, without parallelization, is:

import numpy as np
from convex import Crwlasso_cd

S = np.empty((m, batch_size))

for t in xrange(start_iter, niter):

    ## Begin Warm Start ##
    # Take 5 gradient steps w/ this batch using last coef. to warm start inf.
    for ws in range(5):
        # Initialize the coefficients
        if ws:
            theta = U
        else:
            theta = np.dot(A.T, X)

        # Infer the Coefficients for the given data batch X of size mxn (n=batch_size)
        # Crwlasso_cd is the function that does the inference per data sample
        # It's basically a C-inline code
        for k in range(batch_size):
            U[:,k] = Crwlasso_cd(X[:, k].copy(), A, theta=theta[:,k].copy())

        # Given the inferred coefficients, upda开发者_如何学Gote and renormalize
        # the basis functions A 
        dA1 = np.dot(X - np.dot(A, U), U.T) # Gaussian data likelihood
        A += (eta / batch_size) * dA1
        A = np.dot(A, np.diag(1/np.sqrt((A**2).sum(axis=0))))

Implementation of multiprocessing:

I tried to implement multiprocessing. I have an 8-core machine that I can use.

  1. There are 3 for-loops. The only one that seems to be "parallelizable" is the third one, where the coefficients are inferred:
    • Generate a Queue and stack the iteration-numbers from 0 to batch_size-1 into the Queue
    • Generate 8 processes, and let them work through the Queue
  2. Share the data U using multiprocessing.Array

So, I replaced this third loop with the following:

from multiprocessing import Process, Queue
import multiprocessing as mp
from Queue import Empty

num_cpu = mp.cpu_count()
work_queue = Queue()

# Generate the empty ndarray U and a multiprocessing.Array-Wrapper U_mp around U
# The class Wrap_mp is attached. Basically, U_mp.asarray() gives the corresponding
# ndarray
U = np.empty((p, batch_size))
U_mp = Wrap_mp(U)

...

        # Within the for-loops:
        for p in xrange(batch_size):
        work_queue.put(p)

        processes = [Process(target=infer_coefficients_mp, args=(work_queue,U_mp,A,X)) for p in range(num_cpu)]

        for p in processes:
            p.start()
            print p.pid
        for p in processes:
            p.join()

Here is the class Wrap_mp:

class Wrap_mp(object):
""" Wrapper around multiprocessing.Array to share an array across
    processes. Store the array as a multiprocessing.Array, but compute with it
as a numpy.ndarray
"""

    def __init__(self, arr):
        """ Initialize a shared array from a numpy array.

            The data is copied.
        """
        self.data = ndarray_to_shmem(arr)
        self.dtype = arr.dtype
        self.shape = arr.shape

    def __array__(self):
        """ Implement the array protocole.
        """
        arr = shmem_as_ndarray(self.data, dtype=self.dtype)
        arr.shape = self.shape
        return arr

    def asarray(self):
        return self.__array__()

And here is the function infer_coefficients_mp:

def infer_feature_coefficients_mp(work_queue,U_mp,A,X):

    while True:
        try:
            index = work_queue.get(block=False)
            x = X[:,index]
            U = U_mp.asarray()
            theta = np.dot(phit,x)

            # Infer the coefficients of the column index
            U[:,index] = Crwlasso_cd(x.copy(), A, theta=theta.copy())

         except Empty:
            break

The problem now are the following:

  1. The multiprocessing version is not faster than the single thread version for the given dimensions of the data.
  2. The process ID's increase with every iteration. Does this mean that there is constantly a new process generated? Doesn't this generate a huge overhead? How can I avoid that? Is there a possibility of creating within the whole for-loop 8 different processes and just update them with the data?
  3. Does the way I share the coefficients U amongst the processes slow the calculation down? Is there another, better way of doing this?
  4. Would a Pool of processes be better?

I am really thankful for any sort of help! I have started working with Python a month ago, and am pretty lost now.

Engin


Every time you create a Process you are creating a new process. If you're doing that within your for loop, then yes, you are starting new processes every time through the loop. It sounds like what you want to do is initialize your Queue and Processes outside of the loop, then fill the Queue inside the loop.

I've used multiprocessing.Pool before, and it's useful, but it doesn't offer much over what you've already implemented with a Queue.


Eventually, this all boils down to one question: Is it possible to start processes outside of the main for-loop, and for every iteration, feed the updated variables in them, have them processing the data, and collecting the newly calculated data from all of the processes, without having to start new processes every iteration?

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号