开发者

Multiprocessing priority queue in Python does not always get correct priority item

开发者 https://www.devze.com 2022-12-07 21:44 出处:网络
The values that .get() returns from my priority queue are ~70% of the time 100% correctly sorted, but ~30% of the time they will only be ~70% sorted correctly, with most of the elements correct but a

The values that .get() returns from my priority queue are ~70% of the time 100% correctly sorted, but ~30% of the time they will only be ~70% sorted correctly, with most of the elements correct but a few elements shuffled incorrectly. Most often when they are returned incorrectly, the first item returned from the priority queue is incorrect and it corresponds to the first (or one of the first) item placed in the priority queue. I think that should be a hint to the problem but I don't know why that is.

I followed this answer to create the multiprocessing priority queue. I tried having the priority queue take in tuples of (priority, data), I also tried using a @dataclass... same result.

I wrote a unit test of sorts for the answer mentioned above. If you run it a few times, eventually you should see an assertion error. Perhaps the answered PriorityQueue isn't actually thread/process safe? If so, how can I make it thread/process safe?

My "minimal" example is below. I have verified the issue in both Python 3.7.13 and 3.10.7.

"""
Test the PriorityQueue class by making a writer process to write data into the
PriorityQueue and a reader process to read data from the PriorityQueue.
Ensure the data read out of the PriorityQueue is sorted by priority
"""

from __future__ import annotations      # For type hinting the values in PriorityQueue
from queue import Queue, PriorityQueue
from typing import Union, Any
import numpy as np

from multiprocessing import Process
from dataclasses import dataclass, field
from multiprocessing.开发者_JAVA技巧managers import SyncManager
from functools import total_ordering

# Via: https://stackoverflow.com/a/25328987/4368898
class PQManager(SyncManager):
    pass

PQManager.register("PriorityQueue", PriorityQueue)

def Manager():
    m = PQManager()
    m.start()
    return m

# Not necessary but recommended from:
#   https://docs.python.org/3/library/queue.html#queue.PriorityQueue
@dataclass(order=True)
class PrioritizedItem:
    """
    Prioritized item for priority queue as recommended at:
        https://docs.python.org/3/library/queue.html#queue.PriorityQueue
    """
    priority: float
    data: Any=field(compare=False)
    def __init__(self, priority: float, data: int = 0):
        self.priority = priority
        self.data = data


def writer_function(priority_queue_under_test: PriorityQueue[PrioritizedItem],
                    num_examples: int = 5) -> None:
    """
    Write data to the priority queue with random priorities
    The final item placed in the priority queue will have a priority of 999.0
        (least priority) with data of 12345
    """
    for i in range(num_examples-1):
        # priority_number: float = round(np.random.normal(), 3)
        a = [-0.954,-1.3,-0.785,-1.324,0.576,1.909,-0.296,0.781,-0.209,-0.492]
        priority_number = a[i]
        print(f"Placing into the PriorityQueue: {priority_number}")
        priority_queue_under_test.put(PrioritizedItem(priority_number, 0))
    priority_queue_under_test.put(PrioritizedItem(999.0, 12345))    # Signal end of queue

def reader_function(priority_queue_under_test: PriorityQueue[PrioritizedItem],
                    queue_of_priorities: Queue[float],
                    queue_of_data: Queue[int]):
    """
    Reads values from the priority_queue_under_test and then places the read
        priorities and data into queue_of_priorities and queue_of_data, respectively.
    Will read until it sees data == 12345
    """
    while True:
        if not priority_queue_under_test.empty():
            prioritized_item: PrioritizedItem = priority_queue_under_test.get()
            p = prioritized_item.priority
            d = prioritized_item.data

            # print(f"Read priority {p}")
            queue_of_priorities.put(p)
            queue_of_data.put(d)
            if d == 12345:
                return

if __name__ == "__main__":

    NUM_EXAMPLES: int = 10

    # Make Queues
    manager = Manager()
    # Entries are typically tuples of the form: (priority number, data)
    # Lowest priority number gets popped first
    priority_queue_under_test = manager.PriorityQueue()
    queue_of_priorities = manager.Queue()
    queue_of_data = manager.Queue()

    # Create processes
    writer_process = Process(target=writer_function, args=(priority_queue_under_test,
                                                              NUM_EXAMPLES,))
    reader_process = Process(target=reader_function, args=(priority_queue_under_test,
                                                              queue_of_priorities,
                                                              queue_of_data,))

    # Start processes
    writer_process.start()
    reader_process.start()

    # Wait for processes to end
    writer_process.join()
    reader_process.join()

    # Read priorities and data from the queues
    priorities = []
    data = []
    while not queue_of_priorities.empty():
        priorities.append(queue_of_priorities.get())
        data.append(queue_of_data.get())
        # print(f"Read priority {priorities[-1]}")

    print(f"Priorities of the items in the PriorityQueue as they were popped "
          f"off of the queue via .get():\n\t{priorities}")
    print(f"Data contained in the PriorityQueue:\n\t{data}")
    print(f"Number of items: {len(data)}")

    # Ensure that the queues have been fully read from and the
    #   priorities read off of the priority queue are sorted low to high
    assert queue_of_priorities.empty()
    assert queue_of_data.empty()
    assert len(priorities) == len(data) == NUM_EXAMPLES
    assert sorted(priorities) == priorities

    print("All checks have passed successfully!")

Good, expected output (~70% of the time):

# python3 test_priority_queue.py 
Placing into the PriorityQueue: -0.954
Placing into the PriorityQueue: -1.3
Placing into the PriorityQueue: -0.785
Placing into the PriorityQueue: -1.324
Placing into the PriorityQueue: 0.576
Placing into the PriorityQueue: 1.909
Placing into the PriorityQueue: -0.296
Placing into the PriorityQueue: 0.781
Placing into the PriorityQueue: -0.209
Priorities of the items in the PriorityQueue as they were popped off of the queue via .get():
        [-1.324, -1.3, -0.954, -0.785, -0.296, -0.209, 0.576, 0.781, 1.909, 999.0]
Data contained in the PriorityQueue:
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 12345]
Number of items: 10
All checks have passed successfully!

Bad, not completely sorted output (~30% of the time):

# python3 test_priority_queue.py 
Placing into the PriorityQueue: -0.954
Placing into the PriorityQueue: -1.3
Placing into the PriorityQueue: -0.785
Placing into the PriorityQueue: -1.324
Placing into the PriorityQueue: 0.576
Placing into the PriorityQueue: 1.909
Placing into the PriorityQueue: -0.296
Placing into the PriorityQueue: 0.781
Placing into the PriorityQueue: -0.209
Priorities of the items in the PriorityQueue as they were popped off of the queue via .get():
        [-0.954, -1.324, -1.3, -0.785, -0.296, -0.209, 0.576, 0.781, 1.909, 999.0]
Data contained in the PriorityQueue:
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 12345]
Number of items: 10
Traceback (most recent call last):
  File "test_priority_queue.py", line 129, in <module>
    assert sorted(priorities) == priorities
AssertionError
0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号