开发者

Python - multiple simultaneous threadpools

开发者 https://www.devze.com 2023-03-06 20:42 出处:网络
I\'m writing a web scraper in python, using httplib2 and lxml (yes - I know I could be using scrapy. Let\'s move past that...) The scraper has about 15000 pages to parse into approximately 400,000 ite

I'm writing a web scraper in python, using httplib2 and lxml (yes - I know I could be using scrapy. Let's move past that...) The scraper has about 15000 pages to parse into approximately 400,000 items. I've got the code to parse the items to run instantaneously (almost) but the portion that downloads the page from the server is still extremely slow. I'd like to overcome that through concurrency. However, I can't rely on EVERY page needing to be parsed EVERY time. I've tried with a single ThreadPool (like multiprocessing.pool, but done with threads - which should be fine since this is an I/O bound process), but I couldn't think of a graceful (or working) way of getting ALL of the threads to stop when the date of the last index item was greater than the item we were processing. Right now, I'm working on a method using two instances of ThreadPool - one to download each page, and another to parse the pages. A simplified code example is:

#! /usr/bin/env python2

import httplib2
from Queue import PriorityQueue
from multi开发者_C百科processing.pool import ThreadPool
from lxml.html import fromstring

pages = [x for x in range(1000)]
page_queue = PriorityQueue(1000)

url = "http://www.google.com"

def get_page(page):
    #Grabs google.com
    h = httplib2.Http(".cache")
    resp, content = h.request(url, "GET")
    tree = fromstring(str(content), base_url=url)
    page_queue.put((page, tree))
    print page_queue.qsize()

def parse_page():
    page_num, page = page_queue.get()
    print "Parsing page #" + str(page_num)
    #do more stuff with the page here
    page_queue.task_done()

if __name__ == "__main__":
    collect_pool = ThreadPool()
    collect_pool.map_async(get_page, pages)
    collect_pool.close()

    parse_pool = ThreadPool()
    parse_pool.apply_async(parse_page)
    parse_pool.close()


     parse_pool.join()
     collect_pool.join()
     page_queue.join()

Running this code however, doesn't do what I expect - which is to fire off two threadpools: one populating a queue and another pulling from it to parse. It begins the collect pool and runs through it and then begins the parse_pool and runs through it (I assume, I've not let the code run long enough to get to the parse_pool - the point is that collect_pool is all that seems to be running). I'm fairly sure I've messed something up with the order of the calls to join(), but I can't for the life of me figure out what order they're supposed to be in. My question is essentially this: Am I barking up the right tree here? and if so, what the hell am I doing wrong? If I'm not - what would your suggestions be


First of all, your design seems to be correct at a high level. The use of a threadpool for collecting the pages is justified by the synchronous nature of the httlib2 module. (With an asynchronous library one thread would be enough; note that even with httplib2 and the pool at most one collector thread is running at any time because of the GIL.) The parsing pool is justified by the lxml module having been written in C/C++ (and assuming that thus the Global Interpreter Lock is released during the parsing of the page - this is to be checked in the lxml docs or code!). If this latter were not true, then there would be no perfomance gain by having a dedicated parsing pool as only one thread would be able to acquire the GIL. In this case it would be better to use a process pool.

I am not familiar with the ThreadPool implementation, but I assume that it is analogous to the Pool class in the multiprocessing module. On this basis the problem appears to be that you create only a single work item for the parse_pool and after parse_page processes the first page it never tries to dequeue further pages from there. Additional work items are not submitted to this pool either, so the processing stops, and after the parse_pool.close() call the threads of the (empty) pool terminate.

The solution is to eliminate the page_queue. The get_page() function should put a work item on the parse_pool by calling apply_async() for every page it collects, instead of feeding them into page_queue.

The main thread should wait till the collect_queue is empty (i.e. the collect_pool.join() call returned), then it should close the parse_pool (as we can be sure that no more work will be submitted for the parser). Then it should wait for the parse_pool to become empty by calling parse_pool.join() and then exit.

Furtheremore you need to increase the number of threads in the connect_pool in order to process more http requests concurrently. The default number of threads in a pool is the number of CPUs; currently you cannot issue more than that many requests. You may experiment with values up to thousands or tenthousands; observere the CPU consumption of the pool; it should not approach 1 CPU.

0

精彩评论

暂无评论...
验证码 换一张
取 消