开发者

Fast way for multiple processes to read from central source in Python?

开发者 https://www.devze.com 2023-02-18 03:54 出处:网络
I\'m looking for a fast way for multiple processes (in multiprocessing.Pool()) to read from a central datasource.Currently I have a file which is read into a queue (using multiprocessing.Manager().Que

I'm looking for a fast way for multiple processes (in multiprocessing.Pool()) to read from a central datasource. Currently I have a file which is read into a queue (using multiprocessing.Manager().Queue()), then a worker pool is started and its processes read from this queue. It works fine, but when I'm working with files which are multiple GBs in size, this becomes a problem as the managed queue is ~7 times slower than a regular python queue.

I think this is down to the way that the manager is in a separate process and it has to communicate over a socket rather than directly with the memory.

Here is the code I am using (the get_records function just reads bytestream for each record from the file and returns it)

from multiprocessing import Manager
manager = Manager()
mgr_q = manager.Queue()
map(mgr_q.put, get_records(f))

So maybe there's a better way to handle this?


Here are some stats about the speed of reading one of my data files (~3GB) into various data types:

Reading into a regular python list. Rate is 229.377 MB/sec

l = []
map(l.append, get_records(f))

Reading in to a regular queue. Rate is 74.035 MB/sec

import Queue
q = Queue.Queue()
map(q.put, get_records(f))

Reading in to a multiprocessing.queues queue. Rate is 67.718 MB/sec

from multiprocessing import Queue
mq = Queue()
map(mq.p开发者_运维知识库ut, get_records(f))

Finally reading into a managed queue. Rate is 9.568 MB/sec

from multiprocessing import Manager
manager = Manager()
mgr_q = manager.Queue()
map(mgr_q.put, get_records(f))

Rates are calculated by rate = duration / filesize / 1024 / 1024


If you're just doing reads on the file it's safe to have multiple processes read at same time. Instead of passing data in the queue, just pass the offset and count. Than in the workers do:

f.seek(offset)
get_records(f, count)
0

精彩评论

暂无评论...
验证码 换一张
取 消