开发者

How check if a task is already in python Queue?

开发者 https://www.devze.com 2022-12-08 14:15 出处:网络
I\'m writing a simple crawler in Python using the threading and Queue modules. I fetch a page, check links and put them into a queue, when a certain thread has开发者_开发问答 finished processing page,

I'm writing a simple crawler in Python using the threading and Queue modules. I fetch a page, check links and put them into a queue, when a certain thread has开发者_开发问答 finished processing page, it grabs the next one from the queue. I'm using an array for the pages I've already visited to filter the links I add to the queue, but if there are more than one threads and they get the same links on different pages, they put duplicate links to the queue. So how can I find out whether some url is already in the queue to avoid putting it there again?


If you don't care about the order in which items are processed, I'd try a subclass of Queue that uses set internally:

class SetQueue(Queue):

    def _init(self, maxsize):
        self.maxsize = maxsize
        self.queue = set()

    def _put(self, item):
        self.queue.add(item)

    def _get(self):
        return self.queue.pop()

As Paul McGuire pointed out, this would allow adding a duplicate item after it's been removed from the "to-be-processed" set and not yet added to the "processed" set. To solve this, you can store both sets in the Queue instance, but since you are using the larger set for checking if the item has been processed, you can just as well go back to queue which will order requests properly.

class SetQueue(Queue):

    def _init(self, maxsize):
        Queue._init(self, maxsize) 
        self.all_items = set()

    def _put(self, item):
        if item not in self.all_items:
            Queue._put(self, item) 
            self.all_items.add(item)

The advantage of this, as opposed to using a set separately, is that the Queue's methods are thread-safe, so that you don't need additional locking for checking the other set.


The put method also needs to be overwritten, if not a join call will block forever https://github.com/python/cpython/blob/master/Lib/queue.py#L147

class UniqueQueue(Queue):

    def put(self, item, block=True, timeout=None):
        if item not in self.queue: # fix join bug
            Queue.put(self, item, block, timeout)

    def _init(self, maxsize):
        self.queue = set()

    def _put(self, item):
        self.queue.add(item)

    def _get(self):
        return self.queue.pop()


What follows is an improvement over Lukáš Lalinský's latter solution. The important difference is that put is overridden in order to ensure unfinished_tasks is accurate and join works as expected.

from queue import Queue

class UniqueQueue(Queue):

    def _init(self, maxsize):
        self.all_items = set()
        Queue._init(self, maxsize)

    def put(self, item, block=True, timeout=None):
        if item not in self.all_items:
            self.all_items.add(item)
            Queue.put(self, item, block, timeout)


SQLite is so simple to use and would fit perfectly... just a suggestion.


The way I solved this (actually I did this in Scala, not Python) was to use both a Set and a Queue, only adding links to the queue (and set) if they did not already exist in the set.

Both the set and queue were encapsulated in a single thread, exposing only a queue-like interface to the consumer threads.

Edit: someone else suggested SQLite and that is also something I am considering, if the set of visited URLs needs to grow large. (Currently each crawl is only a few hundred pages so it easily fits in memory.) But the database is something that can also be encapsulated within the set itself, so the consumer threads need not be aware of it.


use:

url in q.queue

which returns True iff url is in the queue


Why only use the array (ideally, a dictionary would be even better) to filter things you've already visited? Add things to your array/dictionary as soon as you queue them up, and only add them to the queue if they're not already in the array/dict. Then you have 3 simple separate things:

  1. Links not yet seen (neither in queue nor array/dict)
  2. Links scheduled to be visited (in both queue and array/dict)
  3. Links already visited (in array/dict, not in queue)


This is full version of SetQueue

import Queue

class SetQueue(Queue.Queue):
    def _init(self, maxsize):
        Queue.Queue._init(self, maxsize)
        self.all_items = set()

    def _put(self, item):
        if item not in self.all_items:
            Queue.Queue._put(self, item)
            self.all_items.add(item)

    def _get(self):
        item = Queue.Queue._get(self)
        self.all_items.remove(item)
        return item


instead of "array of pages already visited" make an "array of pages already added to the queue"


Sadly, I have no enouch rating for comment the best Lukáš Lalinský’s answer.

To add support for SetQueue.task_done() and SetQueue.join() for second variant of Lukáš Lalinský’s SetQueue add else brahch to the if:

def _put(self, item):
    if item not in self.all_items:
        Queue._put(self, item);
        self.all_items.add(item);
    else:
        self.unfinished_tasks -= 1;

Tested and works with Python 3.4.


I'm agree with @Ben James.Try to use both deque and set.

here are code:

class SetUniqueQueue(Queue):

    def _init(self, maxsize):
        self.queue = deque()
        self.setqueue = set()

    def _put(self, item):
        if item not in self.setqueue:
            self.setqueue.add(item)
            self.queue.append(item)

    def _get(self):
        return self.queue.popleft()


This is my code. Hopefully, it can help someone.

I added set as a cache to the Queue class. This cache was utilized for task uniqueness checking. Moreover, it was used to implement the __contains__ magic method in the Queue class.

Uniqueness can be defined in two ways. First, tasks are unique in the whole life of the queue. In other words, the queue rejects accepting a repeated task even after the task is done and removed from the queue. I implemented this as "be_unique_in_all_items". Second, tasks are unique only in the existing tasks in the queue. It means the task can be accepted after it is done. I implemented this as "be_unique_in_existing_items".

from queue import Queue
from traceback import print_exc


class MQueue(Queue):

    def __init__(self,
                 **kwargs):
        super().__init__(maxsize=kwargs.get("maxsize", 0))

        self._be_unique_in_existing_items = kwargs.get("be_unique_in_existing_items", False)
        self._be_unique_in_all_items = kwargs.get("be_unique_in_all_items", False)

        if self._be_unique_in_existing_items and self._be_unique_in_all_items:
            raise ValueError("Choose one criteria")

        self.cache = set()

    def get(self, *args, **kwargs):
        result = super().get(*args, **kwargs)

        if result:
            if self._be_unique_in_existing_items:
                self.cache.remove(result)

        return result

    def put(self, item, *args, **kwargs):
        if self._be_unique_in_existing_items or self._be_unique_in_all_items:
            if item in self.cache:
                raise ValueError("The given item exists in cache.")
            self.cache.add(item)

        return super().put(item, *args, **kwargs)

    def __contains__(self, item):
        if self._be_unique_in_existing_items or self._be_unique_in_all_items:
            return self.cache.__contains__(item)
        else:
            return Queue.__contains__(item)  # will raise you error


if __name__ == "__main__":
    # ordinary queue
    ordinary_queue_obj = MQueue(maxsize=0)

    ordinary_queue_obj.put(1)
    ordinary_queue_obj.put(1)

    try:
        print(1 in ordinary_queue_obj)
    except Exception:
        print_exc()

    # be unique in existing queue
    unique_in_existing_queue_obj = MQueue(maxsize=0,
                                          be_unique_in_existing_items=True)

    unique_in_existing_queue_obj.put(1)

    print(1 in unique_in_existing_queue_obj)

    try:
        unique_in_existing_queue_obj.put(1)
    except ValueError:
        print_exc()

    task = unique_in_existing_queue_obj.get()
    unique_in_existing_queue_obj.task_done()
    unique_in_existing_queue_obj.put(task)

    # be unique in all queue
    unique_in_all_queue_obj = MQueue(maxsize=0,
                                     be_unique_in_all_items=True)

    unique_in_all_queue_obj.put(1)
    print(1 in unique_in_all_queue_obj)

    try:
        unique_in_all_queue_obj.put(1)
    except ValueError:
        print_exc()

    task = unique_in_all_queue_obj.get()
    unique_in_all_queue_obj.task_done()
    try:
        print(task in unique_in_all_queue_obj)
        unique_in_all_queue_obj.put(task)
    except ValueError:
        print_exc()

Note: set only can contain hashable objects. For unhashable objects use list instead.


Also, instead of a set you might try using a dictionary. Operations on sets tend to get rather slow when they're big, whereas a dictionary lookup is nice and quick.

My 2c.

0

精彩评论

暂无评论...
验证码 换一张
取 消