The values that .get()
returns from my priority queue are ~70% of the time 100% correctly sorted, but ~30% of the time they will only be ~70% sorted correctly, with most of the elements correct but a few elements shuffled incorrectly. Most often when they are returned incorrectly, the first item returned from the priority queue is incorrect and it corresponds to the first (or one of the first) item placed in the priority queue. I think that should be a hint to the problem but I don't know why that is.
I followed this answer to create the multiprocessing priority queue. I tried having the priority queue take in tuples of (priority, data)
, I also tried using a @dataclass
... same result.
I wrote a unit test of sorts for the answer mentioned above. If you run it a few times, eventually you should see an assertion error. Perhaps the answered PriorityQueue isn't actually thread/process safe? If so, how can I make it thread/process safe?
My "minimal" example is below. I have verified the issue in both Python 3.7.13 and 3.10.7.
"""
Test the PriorityQueue class by making a writer process to write data into the
PriorityQueue and a reader process to read data from the PriorityQueue.
Ensure the data read out of the PriorityQueue is sorted by priority
"""
from __future__ import annotations # For type hinting the values in PriorityQueue
from queue import Queue, PriorityQueue
from typing import Union, Any
import numpy as np
from multiprocessing import Process
from dataclasses import dataclass, field
from multiprocessing.开发者_JAVA技巧managers import SyncManager
from functools import total_ordering
# Via: https://stackoverflow.com/a/25328987/4368898
class PQManager(SyncManager):
pass
PQManager.register("PriorityQueue", PriorityQueue)
def Manager():
m = PQManager()
m.start()
return m
# Not necessary but recommended from:
# https://docs.python.org/3/library/queue.html#queue.PriorityQueue
@dataclass(order=True)
class PrioritizedItem:
"""
Prioritized item for priority queue as recommended at:
https://docs.python.org/3/library/queue.html#queue.PriorityQueue
"""
priority: float
data: Any=field(compare=False)
def __init__(self, priority: float, data: int = 0):
self.priority = priority
self.data = data
def writer_function(priority_queue_under_test: PriorityQueue[PrioritizedItem],
num_examples: int = 5) -> None:
"""
Write data to the priority queue with random priorities
The final item placed in the priority queue will have a priority of 999.0
(least priority) with data of 12345
"""
for i in range(num_examples-1):
# priority_number: float = round(np.random.normal(), 3)
a = [-0.954,-1.3,-0.785,-1.324,0.576,1.909,-0.296,0.781,-0.209,-0.492]
priority_number = a[i]
print(f"Placing into the PriorityQueue: {priority_number}")
priority_queue_under_test.put(PrioritizedItem(priority_number, 0))
priority_queue_under_test.put(PrioritizedItem(999.0, 12345)) # Signal end of queue
def reader_function(priority_queue_under_test: PriorityQueue[PrioritizedItem],
queue_of_priorities: Queue[float],
queue_of_data: Queue[int]):
"""
Reads values from the priority_queue_under_test and then places the read
priorities and data into queue_of_priorities and queue_of_data, respectively.
Will read until it sees data == 12345
"""
while True:
if not priority_queue_under_test.empty():
prioritized_item: PrioritizedItem = priority_queue_under_test.get()
p = prioritized_item.priority
d = prioritized_item.data
# print(f"Read priority {p}")
queue_of_priorities.put(p)
queue_of_data.put(d)
if d == 12345:
return
if __name__ == "__main__":
NUM_EXAMPLES: int = 10
# Make Queues
manager = Manager()
# Entries are typically tuples of the form: (priority number, data)
# Lowest priority number gets popped first
priority_queue_under_test = manager.PriorityQueue()
queue_of_priorities = manager.Queue()
queue_of_data = manager.Queue()
# Create processes
writer_process = Process(target=writer_function, args=(priority_queue_under_test,
NUM_EXAMPLES,))
reader_process = Process(target=reader_function, args=(priority_queue_under_test,
queue_of_priorities,
queue_of_data,))
# Start processes
writer_process.start()
reader_process.start()
# Wait for processes to end
writer_process.join()
reader_process.join()
# Read priorities and data from the queues
priorities = []
data = []
while not queue_of_priorities.empty():
priorities.append(queue_of_priorities.get())
data.append(queue_of_data.get())
# print(f"Read priority {priorities[-1]}")
print(f"Priorities of the items in the PriorityQueue as they were popped "
f"off of the queue via .get():\n\t{priorities}")
print(f"Data contained in the PriorityQueue:\n\t{data}")
print(f"Number of items: {len(data)}")
# Ensure that the queues have been fully read from and the
# priorities read off of the priority queue are sorted low to high
assert queue_of_priorities.empty()
assert queue_of_data.empty()
assert len(priorities) == len(data) == NUM_EXAMPLES
assert sorted(priorities) == priorities
print("All checks have passed successfully!")
Good, expected output (~70% of the time):
# python3 test_priority_queue.py
Placing into the PriorityQueue: -0.954
Placing into the PriorityQueue: -1.3
Placing into the PriorityQueue: -0.785
Placing into the PriorityQueue: -1.324
Placing into the PriorityQueue: 0.576
Placing into the PriorityQueue: 1.909
Placing into the PriorityQueue: -0.296
Placing into the PriorityQueue: 0.781
Placing into the PriorityQueue: -0.209
Priorities of the items in the PriorityQueue as they were popped off of the queue via .get():
[-1.324, -1.3, -0.954, -0.785, -0.296, -0.209, 0.576, 0.781, 1.909, 999.0]
Data contained in the PriorityQueue:
[0, 0, 0, 0, 0, 0, 0, 0, 0, 12345]
Number of items: 10
All checks have passed successfully!
Bad, not completely sorted output (~30% of the time):
# python3 test_priority_queue.py
Placing into the PriorityQueue: -0.954
Placing into the PriorityQueue: -1.3
Placing into the PriorityQueue: -0.785
Placing into the PriorityQueue: -1.324
Placing into the PriorityQueue: 0.576
Placing into the PriorityQueue: 1.909
Placing into the PriorityQueue: -0.296
Placing into the PriorityQueue: 0.781
Placing into the PriorityQueue: -0.209
Priorities of the items in the PriorityQueue as they were popped off of the queue via .get():
[-0.954, -1.324, -1.3, -0.785, -0.296, -0.209, 0.576, 0.781, 1.909, 999.0]
Data contained in the PriorityQueue:
[0, 0, 0, 0, 0, 0, 0, 0, 0, 12345]
Number of items: 10
Traceback (most recent call last):
File "test_priority_queue.py", line 129, in <module>
assert sorted(priorities) == priorities
AssertionError
精彩评论