开发者

dividing and conquering etree.iterparse using multiprocessing

开发者 https://www.devze.com 2023-02-05 22:37 出处:网络
so let\'s imagine a large xml document (filesize > 100 mb) that we want to iterparse using cElementTree.iterparse.

so let's imagine a large xml document (filesize > 100 mb) that we want to iterparse using cElementTree.iterparse.

but all those cores Intel promised us would be worthwhile, how do we put them to use? here's what I want:

from itertools import islice
from xml.etree import ElementTree as etree

tree_iter = etree.iterparse(open("large_file.xml", encoding="utf-8"))

first = isli开发者_StackOverflow中文版ce(tree_iter, 0, 10000)
second = islice(tree_iter, 10000)

parse_first()
parse_second()

There seems to be several problems with this, not the least being that the iterator returned by iterparse() seems to resist slicing.

Is there any way to divide the parsing workload of a large xml document into two or four separate tasks (without loading the entire document into memory? the purpose being then to execute the tasks on separate processors.


I think you need a good threadpool with a task queue for this. I found (and use) this very good one (it's in python3, but shouldn't be too hard to convert to 2.x):

# http://code.activestate.com/recipes/577187-python-thread-pool/

from queue import Queue
from threading import Thread

class Worker(Thread):
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try: func(*args, **kargs)
            except Exception as exception: print(exception)
            self.tasks.task_done()

class ThreadPool:
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads): Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        self.tasks.join()

Now you can just run the loop on the iterparse and let the threadpool divide the work for you. Using it is a simple as this:

def executetask(arg):
    print(arg)

workers = threadpool.ThreadPool(4) # 4 is the number of threads
for i in range(100): workers.add_task(executetask, i)

workers.wait_completion() # not needed, only if you need to be certain all work is done before continuing
0

精彩评论

暂无评论...
验证码 换一张
取 消