开发者

Python Multi-Processing Question?

开发者 https://www.devze.com 2023-03-16 09:36 出处:网络
I have a folder with 500 input files (total size of all files is ~ 500[MB]). I\'d like to write a python script that does the following:

I have a folder with 500 input files (total size of all files is ~ 500[MB]).

I'd like to write a python script that does the following:

(1) load all of the input files to memory

(2) initializes an empty python list that will later be used ... see bullet (4)

(3) start 15 different (independent) processes: each of these uses the same input data [from (1)] -- yet uses a different algorithms to processes it, thus generating different results

(4) I'd like all the independent processes [from step (3)] to store their output in the same python开发者_运维知识库 list [same list that was initialized in step (2)]

Once all 15 processes have completed their run, I will have one python list that includes the results of all the 15 independent processes.

My question is, is it possible to do the above efficiently in python? if so, can you provide a scheme / sample code that illustrates how to do so?

Note #1: I will be running this on a strong, multi-core server; so the goal here is to use all the processing power while sharing some memory {input data, output list} among all the independent processes.

Note #2: I am working in a Linux environment


ok I just whipped this up using zeromq to demonstrate a single subscriber to multiple publishers. You could probably do the same with queues but you would need to manage them a bit more. zeromq sockets just work which makes it nice for things like this IMO.

"""
demo of multiple processes doing processing and publishing the results
to a common subscriber
"""
from multiprocessing import Process


class Worker(Process):
    def __init__(self, filename, bind):
        self._filename = filename
        self._bind = bind
        super(Worker, self).__init__()

    def run(self):
        import zmq
        import time
        ctx = zmq.Context()
        result_publisher = ctx.socket(zmq.PUB)
        result_publisher.bind(self._bind)
        time.sleep(1)
        with open(self._filename) as my_input:
            for l in my_input.readlines():
                result_publisher.send(l)

if __name__ == '__main__':
    import sys
    import os
    import zmq

    #assume every argument but the first is a file to be processed
    files = sys.argv[1:]

    # create a worker for each file to be processed if it exists pass
    # in a bind argument instructing the socket to communicate via ipc
    workers = [Worker(f, "ipc://%s_%s" % (f, i)) for i, f \
               in enumerate((x for x in files if os.path.exists(x)))]

    # create subscriber socket
    ctx = zmq.Context()

    result_subscriber = ctx.socket(zmq.SUB)
    result_subscriber.setsockopt(zmq.SUBSCRIBE, "")

    # wire up subscriber to whatever the worker is bound to 
    for w in workers:
        print w._bind
        result_subscriber.connect(w._bind)

    # start workers
    for w in workers:
        print "starting workers..."
        w.start()

    result = []

    # read from the subscriber and add it to the result list as long
    # as at least one worker is alive
    while [w for w in workers if w.is_alive()]:
        result.append(result_subscriber.recv())
    else:
        # output the result
        print result

oh and to get zmq just

$ pip install pyzmq-static
0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号