A thread-safe priority queue keeping its elements unique
Clash Royale CLAN TAG#URR8PPP
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;
up vote
9
down vote
favorite
This class is intended to be used in situations where queue.Queue
does not suffice, because the following two properties are needed.
- Every item in the queue can have a priority assigned.
- No item can occur more than once in the queue.
So here is my current implementation. I would be happy to hear your ideas on how the code can be improved.
"""A thread-safe priority queue keeping its elements unique
"""
import collections
import threading
import time
import unittest
from typing import DefaultDict, Deque, Set, Generic, TypeVar
T = TypeVar('T')
class OrderedSetPriorityQueue(Generic[T]):
"""A thread-safe priority queue keeping its elements unique"""
def __init__(self) -> None:
self.deques: DefaultDict[int, Deque[T]] =
collections.defaultdict(collections.deque)
self.elem_sets: DefaultDict[int, Set[T]] =
collections.defaultdict(set)
self.condition_var = threading.Condition(threading.RLock())
def contains(self, item: T) -> bool:
"""Check if the item is already queued."""
with self.condition_var:
for elem_set in self.elem_sets.values():
if item in elem_set:
return True
return False
def contains_with_prio(self, item: T, priority: int) -> bool:
"""Check if the item is already queued with this exact priority."""
with self.condition_var:
if priority not in self.elem_sets:
return False
return item in self.elem_sets[priority]
def remove(self, item: T) -> bool:
"""Remove an item from the queue, disregarding its stored priority."""
with self.condition_var:
if not self.contains(item):
return False
removed_count = 0
for set_prio, elem_set in self.elem_sets.items():
if item in elem_set:
self.deques[set_prio].remove(item)
elem_set.remove(item)
removed_count += 1
assert removed_count in [0, 1]
self._clean_up()
return removed_count > 0
def put(self, item: T, priority: int = 0) -> bool:
"""Returns False if item already is queued with the same priority.
If is has not been queued yet, it is added and True is returned.
If it already was queued but with a different priority,
the entry with the old priority will be removed,
and the new one is added."""
with self.condition_var:
if self.contains_with_prio(item, priority):
return False
if self.contains(item):
self.remove(item)
self.deques[priority].appendleft(item)
self.elem_sets[priority].add(item)
self.condition_var.notify()
return True
def empty(self) -> bool:
"""Return True if the queue is completely empty."""
with self.condition_var:
assert bool(self.elem_sets) == bool(self.deques)
return not self.elem_sets
def size(self) -> int:
"""Number of elements in the queue."""
with self.condition_var:
return sum(map(len, self.elem_sets.values()))
def get(self) -> T:
"""Pop the oldest item from the highest priority."""
with self.condition_var:
while not self.elem_sets:
self.condition_var.wait()
priority = sorted(self.deques.keys())[-1]
item = self.deques[priority].pop()
self.elem_sets[priority].remove(item)
self._clean_up()
return item
def _clean_up(self) -> None:
"""Internal function used to clean up unused data structures."""
with self.condition_var:
assert sorted(self.deques.keys()) == sorted(self.elem_sets.keys())
priorities = list(self.elem_sets.keys())
for priority in priorities:
if not self.deques[priority]:
del self.deques[priority]
if not self.elem_sets[priority]:
del self.elem_sets[priority]
class OrderedSetPriorityQueueTest(unittest.TestCase):
"""
Verify integrity of custom queue implementation
"""
def test_get_and_put(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())
def test_order(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
queue.put(2)
queue.put(1)
self.assertEqual(0, queue.get())
self.assertEqual(2, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())
def test_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 0)
queue.put(1, 1)
self.assertEqual(1, queue.get())
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())
def test_increase_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 0)
queue.put(1, 1)
queue.put(0, 2)
self.assertEqual(2, queue.size())
self.assertEqual(0, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())
def test_decrease_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 1)
queue.put(1, 1)
queue.put(0, 0)
self.assertEqual(1, queue.get())
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())
def test_uniqueness(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
queue.put(0)
queue.put(1)
queue.put(0)
self.assertEqual(0, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())
def test_get_with_wait(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
def add_to_queue() -> None:
time.sleep(0.2)
queue.put(0)
thread = threading.Thread(target=add_to_queue, args=(), kwargs=)
thread.start()
self.assertEqual(0, queue.get())
thread.join()
self.assertTrue(queue.empty())
python python-3.x multithreading thread-safety queue
 |Â
show 3 more comments
up vote
9
down vote
favorite
This class is intended to be used in situations where queue.Queue
does not suffice, because the following two properties are needed.
- Every item in the queue can have a priority assigned.
- No item can occur more than once in the queue.
So here is my current implementation. I would be happy to hear your ideas on how the code can be improved.
"""A thread-safe priority queue keeping its elements unique
"""
import collections
import threading
import time
import unittest
from typing import DefaultDict, Deque, Set, Generic, TypeVar
T = TypeVar('T')
class OrderedSetPriorityQueue(Generic[T]):
"""A thread-safe priority queue keeping its elements unique"""
def __init__(self) -> None:
self.deques: DefaultDict[int, Deque[T]] =
collections.defaultdict(collections.deque)
self.elem_sets: DefaultDict[int, Set[T]] =
collections.defaultdict(set)
self.condition_var = threading.Condition(threading.RLock())
def contains(self, item: T) -> bool:
"""Check if the item is already queued."""
with self.condition_var:
for elem_set in self.elem_sets.values():
if item in elem_set:
return True
return False
def contains_with_prio(self, item: T, priority: int) -> bool:
"""Check if the item is already queued with this exact priority."""
with self.condition_var:
if priority not in self.elem_sets:
return False
return item in self.elem_sets[priority]
def remove(self, item: T) -> bool:
"""Remove an item from the queue, disregarding its stored priority."""
with self.condition_var:
if not self.contains(item):
return False
removed_count = 0
for set_prio, elem_set in self.elem_sets.items():
if item in elem_set:
self.deques[set_prio].remove(item)
elem_set.remove(item)
removed_count += 1
assert removed_count in [0, 1]
self._clean_up()
return removed_count > 0
def put(self, item: T, priority: int = 0) -> bool:
"""Returns False if item already is queued with the same priority.
If is has not been queued yet, it is added and True is returned.
If it already was queued but with a different priority,
the entry with the old priority will be removed,
and the new one is added."""
with self.condition_var:
if self.contains_with_prio(item, priority):
return False
if self.contains(item):
self.remove(item)
self.deques[priority].appendleft(item)
self.elem_sets[priority].add(item)
self.condition_var.notify()
return True
def empty(self) -> bool:
"""Return True if the queue is completely empty."""
with self.condition_var:
assert bool(self.elem_sets) == bool(self.deques)
return not self.elem_sets
def size(self) -> int:
"""Number of elements in the queue."""
with self.condition_var:
return sum(map(len, self.elem_sets.values()))
def get(self) -> T:
"""Pop the oldest item from the highest priority."""
with self.condition_var:
while not self.elem_sets:
self.condition_var.wait()
priority = sorted(self.deques.keys())[-1]
item = self.deques[priority].pop()
self.elem_sets[priority].remove(item)
self._clean_up()
return item
def _clean_up(self) -> None:
"""Internal function used to clean up unused data structures."""
with self.condition_var:
assert sorted(self.deques.keys()) == sorted(self.elem_sets.keys())
priorities = list(self.elem_sets.keys())
for priority in priorities:
if not self.deques[priority]:
del self.deques[priority]
if not self.elem_sets[priority]:
del self.elem_sets[priority]
class OrderedSetPriorityQueueTest(unittest.TestCase):
"""
Verify integrity of custom queue implementation
"""
def test_get_and_put(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())
def test_order(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
queue.put(2)
queue.put(1)
self.assertEqual(0, queue.get())
self.assertEqual(2, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())
def test_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 0)
queue.put(1, 1)
self.assertEqual(1, queue.get())
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())
def test_increase_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 0)
queue.put(1, 1)
queue.put(0, 2)
self.assertEqual(2, queue.size())
self.assertEqual(0, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())
def test_decrease_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 1)
queue.put(1, 1)
queue.put(0, 0)
self.assertEqual(1, queue.get())
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())
def test_uniqueness(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
queue.put(0)
queue.put(1)
queue.put(0)
self.assertEqual(0, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())
def test_get_with_wait(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
def add_to_queue() -> None:
time.sleep(0.2)
queue.put(0)
thread = threading.Thread(target=add_to_queue, args=(), kwargs=)
thread.start()
self.assertEqual(0, queue.get())
thread.join()
self.assertTrue(queue.empty())
python python-3.x multithreading thread-safety queue
You're aware of thePriorityQueue
yes? (from queue import PriorityQueue
) As for the constraint of no duplicate items, I guess you'll want to have a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue.
– C. Harley
Aug 28 at 8:39
@C.Harley Yes, I knowPriorityQueue
. But how can one check if an item is already present in it?
– Tobias Hermann
Aug 28 at 10:19
You'll want to create a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue
– C. Harley
Aug 30 at 8:16
Ah, so you mean maintaining additional sets for those checks likeself.elem_sets
in the current implementation.put
would check if the item is already in the set before adding it to the wrapped queue. Andget
would remove the just-popped item from the set. Yes, seems to make sense. But I'm not yet sure how updating the priority of an item already in the queue would work.
– Tobias Hermann
Aug 30 at 9:21
If you're modifying items after they've been set, then your implementation is incorrectly assigning priority to begin with. Return to the drawing board and correct your design.
– C. Harley
Aug 31 at 2:24
 |Â
show 3 more comments
up vote
9
down vote
favorite
up vote
9
down vote
favorite
This class is intended to be used in situations where queue.Queue
does not suffice, because the following two properties are needed.
- Every item in the queue can have a priority assigned.
- No item can occur more than once in the queue.
So here is my current implementation. I would be happy to hear your ideas on how the code can be improved.
"""A thread-safe priority queue keeping its elements unique
"""
import collections
import threading
import time
import unittest
from typing import DefaultDict, Deque, Set, Generic, TypeVar
T = TypeVar('T')
class OrderedSetPriorityQueue(Generic[T]):
"""A thread-safe priority queue keeping its elements unique"""
def __init__(self) -> None:
self.deques: DefaultDict[int, Deque[T]] =
collections.defaultdict(collections.deque)
self.elem_sets: DefaultDict[int, Set[T]] =
collections.defaultdict(set)
self.condition_var = threading.Condition(threading.RLock())
def contains(self, item: T) -> bool:
"""Check if the item is already queued."""
with self.condition_var:
for elem_set in self.elem_sets.values():
if item in elem_set:
return True
return False
def contains_with_prio(self, item: T, priority: int) -> bool:
"""Check if the item is already queued with this exact priority."""
with self.condition_var:
if priority not in self.elem_sets:
return False
return item in self.elem_sets[priority]
def remove(self, item: T) -> bool:
"""Remove an item from the queue, disregarding its stored priority."""
with self.condition_var:
if not self.contains(item):
return False
removed_count = 0
for set_prio, elem_set in self.elem_sets.items():
if item in elem_set:
self.deques[set_prio].remove(item)
elem_set.remove(item)
removed_count += 1
assert removed_count in [0, 1]
self._clean_up()
return removed_count > 0
def put(self, item: T, priority: int = 0) -> bool:
"""Returns False if item already is queued with the same priority.
If is has not been queued yet, it is added and True is returned.
If it already was queued but with a different priority,
the entry with the old priority will be removed,
and the new one is added."""
with self.condition_var:
if self.contains_with_prio(item, priority):
return False
if self.contains(item):
self.remove(item)
self.deques[priority].appendleft(item)
self.elem_sets[priority].add(item)
self.condition_var.notify()
return True
def empty(self) -> bool:
"""Return True if the queue is completely empty."""
with self.condition_var:
assert bool(self.elem_sets) == bool(self.deques)
return not self.elem_sets
def size(self) -> int:
"""Number of elements in the queue."""
with self.condition_var:
return sum(map(len, self.elem_sets.values()))
def get(self) -> T:
"""Pop the oldest item from the highest priority."""
with self.condition_var:
while not self.elem_sets:
self.condition_var.wait()
priority = sorted(self.deques.keys())[-1]
item = self.deques[priority].pop()
self.elem_sets[priority].remove(item)
self._clean_up()
return item
def _clean_up(self) -> None:
"""Internal function used to clean up unused data structures."""
with self.condition_var:
assert sorted(self.deques.keys()) == sorted(self.elem_sets.keys())
priorities = list(self.elem_sets.keys())
for priority in priorities:
if not self.deques[priority]:
del self.deques[priority]
if not self.elem_sets[priority]:
del self.elem_sets[priority]
class OrderedSetPriorityQueueTest(unittest.TestCase):
"""
Verify integrity of custom queue implementation
"""
def test_get_and_put(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())
def test_order(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
queue.put(2)
queue.put(1)
self.assertEqual(0, queue.get())
self.assertEqual(2, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())
def test_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 0)
queue.put(1, 1)
self.assertEqual(1, queue.get())
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())
def test_increase_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 0)
queue.put(1, 1)
queue.put(0, 2)
self.assertEqual(2, queue.size())
self.assertEqual(0, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())
def test_decrease_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 1)
queue.put(1, 1)
queue.put(0, 0)
self.assertEqual(1, queue.get())
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())
def test_uniqueness(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
queue.put(0)
queue.put(1)
queue.put(0)
self.assertEqual(0, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())
def test_get_with_wait(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
def add_to_queue() -> None:
time.sleep(0.2)
queue.put(0)
thread = threading.Thread(target=add_to_queue, args=(), kwargs=)
thread.start()
self.assertEqual(0, queue.get())
thread.join()
self.assertTrue(queue.empty())
python python-3.x multithreading thread-safety queue
This class is intended to be used in situations where queue.Queue
does not suffice, because the following two properties are needed.
- Every item in the queue can have a priority assigned.
- No item can occur more than once in the queue.
So here is my current implementation. I would be happy to hear your ideas on how the code can be improved.
"""A thread-safe priority queue keeping its elements unique
"""
import collections
import threading
import time
import unittest
from typing import DefaultDict, Deque, Set, Generic, TypeVar
T = TypeVar('T')
class OrderedSetPriorityQueue(Generic[T]):
"""A thread-safe priority queue keeping its elements unique"""
def __init__(self) -> None:
self.deques: DefaultDict[int, Deque[T]] =
collections.defaultdict(collections.deque)
self.elem_sets: DefaultDict[int, Set[T]] =
collections.defaultdict(set)
self.condition_var = threading.Condition(threading.RLock())
def contains(self, item: T) -> bool:
"""Check if the item is already queued."""
with self.condition_var:
for elem_set in self.elem_sets.values():
if item in elem_set:
return True
return False
def contains_with_prio(self, item: T, priority: int) -> bool:
"""Check if the item is already queued with this exact priority."""
with self.condition_var:
if priority not in self.elem_sets:
return False
return item in self.elem_sets[priority]
def remove(self, item: T) -> bool:
"""Remove an item from the queue, disregarding its stored priority."""
with self.condition_var:
if not self.contains(item):
return False
removed_count = 0
for set_prio, elem_set in self.elem_sets.items():
if item in elem_set:
self.deques[set_prio].remove(item)
elem_set.remove(item)
removed_count += 1
assert removed_count in [0, 1]
self._clean_up()
return removed_count > 0
def put(self, item: T, priority: int = 0) -> bool:
"""Returns False if item already is queued with the same priority.
If is has not been queued yet, it is added and True is returned.
If it already was queued but with a different priority,
the entry with the old priority will be removed,
and the new one is added."""
with self.condition_var:
if self.contains_with_prio(item, priority):
return False
if self.contains(item):
self.remove(item)
self.deques[priority].appendleft(item)
self.elem_sets[priority].add(item)
self.condition_var.notify()
return True
def empty(self) -> bool:
"""Return True if the queue is completely empty."""
with self.condition_var:
assert bool(self.elem_sets) == bool(self.deques)
return not self.elem_sets
def size(self) -> int:
"""Number of elements in the queue."""
with self.condition_var:
return sum(map(len, self.elem_sets.values()))
def get(self) -> T:
"""Pop the oldest item from the highest priority."""
with self.condition_var:
while not self.elem_sets:
self.condition_var.wait()
priority = sorted(self.deques.keys())[-1]
item = self.deques[priority].pop()
self.elem_sets[priority].remove(item)
self._clean_up()
return item
def _clean_up(self) -> None:
"""Internal function used to clean up unused data structures."""
with self.condition_var:
assert sorted(self.deques.keys()) == sorted(self.elem_sets.keys())
priorities = list(self.elem_sets.keys())
for priority in priorities:
if not self.deques[priority]:
del self.deques[priority]
if not self.elem_sets[priority]:
del self.elem_sets[priority]
class OrderedSetPriorityQueueTest(unittest.TestCase):
"""
Verify integrity of custom queue implementation
"""
def test_get_and_put(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())
def test_order(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
queue.put(2)
queue.put(1)
self.assertEqual(0, queue.get())
self.assertEqual(2, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())
def test_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 0)
queue.put(1, 1)
self.assertEqual(1, queue.get())
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())
def test_increase_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 0)
queue.put(1, 1)
queue.put(0, 2)
self.assertEqual(2, queue.size())
self.assertEqual(0, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())
def test_decrease_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 1)
queue.put(1, 1)
queue.put(0, 0)
self.assertEqual(1, queue.get())
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())
def test_uniqueness(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
queue.put(0)
queue.put(1)
queue.put(0)
self.assertEqual(0, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())
def test_get_with_wait(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
def add_to_queue() -> None:
time.sleep(0.2)
queue.put(0)
thread = threading.Thread(target=add_to_queue, args=(), kwargs=)
thread.start()
self.assertEqual(0, queue.get())
thread.join()
self.assertTrue(queue.empty())
python python-3.x multithreading thread-safety queue
edited Aug 24 at 14:25
asked Aug 24 at 11:46


Tobias Hermann
390217
390217
You're aware of thePriorityQueue
yes? (from queue import PriorityQueue
) As for the constraint of no duplicate items, I guess you'll want to have a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue.
– C. Harley
Aug 28 at 8:39
@C.Harley Yes, I knowPriorityQueue
. But how can one check if an item is already present in it?
– Tobias Hermann
Aug 28 at 10:19
You'll want to create a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue
– C. Harley
Aug 30 at 8:16
Ah, so you mean maintaining additional sets for those checks likeself.elem_sets
in the current implementation.put
would check if the item is already in the set before adding it to the wrapped queue. Andget
would remove the just-popped item from the set. Yes, seems to make sense. But I'm not yet sure how updating the priority of an item already in the queue would work.
– Tobias Hermann
Aug 30 at 9:21
If you're modifying items after they've been set, then your implementation is incorrectly assigning priority to begin with. Return to the drawing board and correct your design.
– C. Harley
Aug 31 at 2:24
 |Â
show 3 more comments
You're aware of thePriorityQueue
yes? (from queue import PriorityQueue
) As for the constraint of no duplicate items, I guess you'll want to have a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue.
– C. Harley
Aug 28 at 8:39
@C.Harley Yes, I knowPriorityQueue
. But how can one check if an item is already present in it?
– Tobias Hermann
Aug 28 at 10:19
You'll want to create a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue
– C. Harley
Aug 30 at 8:16
Ah, so you mean maintaining additional sets for those checks likeself.elem_sets
in the current implementation.put
would check if the item is already in the set before adding it to the wrapped queue. Andget
would remove the just-popped item from the set. Yes, seems to make sense. But I'm not yet sure how updating the priority of an item already in the queue would work.
– Tobias Hermann
Aug 30 at 9:21
If you're modifying items after they've been set, then your implementation is incorrectly assigning priority to begin with. Return to the drawing board and correct your design.
– C. Harley
Aug 31 at 2:24
You're aware of the
PriorityQueue
yes? (from queue import PriorityQueue
) As for the constraint of no duplicate items, I guess you'll want to have a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue.– C. Harley
Aug 28 at 8:39
You're aware of the
PriorityQueue
yes? (from queue import PriorityQueue
) As for the constraint of no duplicate items, I guess you'll want to have a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue.– C. Harley
Aug 28 at 8:39
@C.Harley Yes, I know
PriorityQueue
. But how can one check if an item is already present in it?– Tobias Hermann
Aug 28 at 10:19
@C.Harley Yes, I know
PriorityQueue
. But how can one check if an item is already present in it?– Tobias Hermann
Aug 28 at 10:19
You'll want to create a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue
– C. Harley
Aug 30 at 8:16
You'll want to create a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue
– C. Harley
Aug 30 at 8:16
Ah, so you mean maintaining additional sets for those checks like
self.elem_sets
in the current implementation. put
would check if the item is already in the set before adding it to the wrapped queue. And get
would remove the just-popped item from the set. Yes, seems to make sense. But I'm not yet sure how updating the priority of an item already in the queue would work.– Tobias Hermann
Aug 30 at 9:21
Ah, so you mean maintaining additional sets for those checks like
self.elem_sets
in the current implementation. put
would check if the item is already in the set before adding it to the wrapped queue. And get
would remove the just-popped item from the set. Yes, seems to make sense. But I'm not yet sure how updating the priority of an item already in the queue would work.– Tobias Hermann
Aug 30 at 9:21
If you're modifying items after they've been set, then your implementation is incorrectly assigning priority to begin with. Return to the drawing board and correct your design.
– C. Harley
Aug 31 at 2:24
If you're modifying items after they've been set, then your implementation is incorrectly assigning priority to begin with. Return to the drawing board and correct your design.
– C. Harley
Aug 31 at 2:24
 |Â
show 3 more comments
1 Answer
1
active
oldest
votes
up vote
10
down vote
accepted
Let's start with the good:
- Ohh recent Python 3, nice!
- Good documentation
- Good use of
typing
. This is great for datastructures. - Good formatting of your code. Nice spacing. Decent naming
- Good use of
defaultdict
- Good use of
assert
s
Things I noticed along the way:
contains
is usually spelled__contains__
. This way you can doitem in queue
.- I'd rename
contains_with_prio
. I'm not sure if I would call itcontains
, butqueue.contains(item, priority=5)
reads better to me. At the least, don't abbreviatepriority
. - I'd name
condition_var
to justlock
.with self.lock
reads better this way. - If you name it
remove
for parity with how the builtinset
does it, I'd have itraise KeyError
ifitem
isn't in the queue instead of returning abool
. If you want to return abool
, then name itdiscard
(also for parity). - I'd do
assert removed_count in (0, 1)
(tuples are preferred when you don't need to modify the object) - Instead of
empty
define a__bool__
which returnsTrue
if the queue has items. This is for parity with lists. lists and other builtins are falsy when empty - Rename
size
to__len__
so you can dolen(queue)
- Rename
get
topop
(you even call it this in the comment) for parity with sets (and python's heap implementation) - For imports, I generally prefer
from unittest import TestCase
. Repeating the module name tends to be tedious and in many cases doesn't add much. - Make sure your tests are in a separate file (if the source is in
foo/ordered_set_priority_queue.py
you'd typically place them intest/foo/test_ordered_set_priority_queue.py
; this allows you--assuming you've added all the__init__.py
s--to dopython3 -m unittest discover .
to run all tests) queue.put(0, 1)
is unclear without looking at the method definition. Perhaps use a kwarg here to make it clearer:queue.put(0, priority=1)
- Using
Thread
in a test is fairly fragile. Generally, testing concurrent datastructures is tricky business. This test may not always fail if there is a problem in your code unless you're very careful: https://stackoverflow.com/questions/8995491/testing-concurrent-data-structures. You're probably better off introducing some special breakpoints in your code (removed out for production) that allows you to pause your datastructure so you can specifically craft degenerate access patterns. - One of the key parts of a priority queue is constant time access to the top of the queue. You actually don't have that because in your
get
you dosorted(self.deques.keys())[-1]
. This isn log n
and also inefficient because it sorts the whole list (when you only need the first element). It also requires building a list in memory. Consider using python heaps to maintain a sorted heap of priorities. This way yourget
is always constant time. - Generally we prefix private attributes with
_
, soself._deques
,self._elem_sets
(call thisself._item_sets
, don't abbreviate),self._lock
, etc. - There's no need to do
priorities = list(self.elem_sets.keys())
in_clean_up()
- I'd benchmark to see if
_clean_up
is necessary. It may not be, and eliminating it makes the logic simpler (if you take my advice and use python heap stuff) - For parity with Python sets rename
put
toinsert
Here's a big detail:
The fact that you assert sorted(self.deques.keys()) == sorted(self.elem_sets.keys())
is an indication that you have a data dependency that could probably be refactored. Ideally, you'd never want that to be False
, but if you separately manage deques
and elem_sets
then perhaps they could get out of sync. This also gives you multiple ways to get all of the priority levels (ex. self._deques.keys()
and self._item_sets.keys()
). You can fix this be grouping a deque and set together into a PriorityLevel
.
class PriorityLevel(Generic[T]):
def __init__(self) -> None:
self._ordered_items: Deque[T] = deque()
self._items_set: Set[T] =
def __contains__(self, item: T) -> bool:
return item in self._items_set
def pop(self) -> T:
item = self._ordered_items.pop()
self._items_set.remove(item)
return item
# etc.
Then have your datastructure maintain one defaultdict
of these:
class OrderedSetPriorityQueue(Generic[T]):
def __init__(self) -> None:
self._levels: DefaultDict[int, PriorityLevel[T]] = defaultdict(PriorityLevel)
def __contains__(self, item: T) -> bool:
with self._lock:
return any(item in level for level in self._levels.values())
def insert(self, item, *, priority: int = 0) -> bool:
with self._lock:
self.discard(item)
return self._levels[priority].insert(item)
def discard(self, item: T) -> bool:
with self._lock:
return any(level.discard(item) for level in self._levels.values())
# etc.
In this way you never have to have a priority
and then hope that self.deques[priority]
and self.elem_sets[priority]
gives you what you want (and they haven't gotten out of sync).
You may also want to consider not using a global lock. A readwrite lock may be more efficient (especially for non-mutating operations). Refactoring out PriorityLevel
may also allow you to do per-level locking, which may be more efficient than global locking.
Also, don't check if item in self
before doing a discard
for example. It's just needless work. Do the discard
and handle failure accordingly. Also be careful with how this interacts with locking (I see you needed a notify
, which I suspect stems from this).
Wow, thank you so much for this awesome review! I just already implemented the majority of your smaller suggestions: gist.github.com/Dobiasd/37705392b4aaa3a3539ba1a61efec6b6 Next I'll look into heaps and use aPriorityLevel
class, mainly to avoid possible out-of-sync situations, but also to drop_clean_up
and makepop
O(1)
. Also I'll check out thereaderwriterlock
library. The reason why I dopriorities = list(self._elem_sets.keys())
instead of directly usingfor priority in self._elem_sets.keys():
isRuntimeError: dictionary changed size during iteration
. ;)
– Tobias Hermann
Aug 24 at 13:17
Glad I could help. And true! That makes sense. Though, that's a good reason for removing all of that complexity unless you can benchmark to prove its needed. Simple algorithms usually perform better :)
– Bailey Parker
Aug 24 at 13:20
1
Nice review. I'd add two little things: a link to PEP3102 since you almost name it, and the usage ofany
for the implementation of__contains__
:return any(item in elem_set for elem_set in self.elem_set.values())
.
– Mathias Ettinger
Aug 25 at 21:37
@MathiasEttinger Indeed, both of these are excellent suggestions :)
– Bailey Parker
Aug 25 at 21:40
add a comment |Â
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
10
down vote
accepted
Let's start with the good:
- Ohh recent Python 3, nice!
- Good documentation
- Good use of
typing
. This is great for datastructures. - Good formatting of your code. Nice spacing. Decent naming
- Good use of
defaultdict
- Good use of
assert
s
Things I noticed along the way:
contains
is usually spelled__contains__
. This way you can doitem in queue
.- I'd rename
contains_with_prio
. I'm not sure if I would call itcontains
, butqueue.contains(item, priority=5)
reads better to me. At the least, don't abbreviatepriority
. - I'd name
condition_var
to justlock
.with self.lock
reads better this way. - If you name it
remove
for parity with how the builtinset
does it, I'd have itraise KeyError
ifitem
isn't in the queue instead of returning abool
. If you want to return abool
, then name itdiscard
(also for parity). - I'd do
assert removed_count in (0, 1)
(tuples are preferred when you don't need to modify the object) - Instead of
empty
define a__bool__
which returnsTrue
if the queue has items. This is for parity with lists. lists and other builtins are falsy when empty - Rename
size
to__len__
so you can dolen(queue)
- Rename
get
topop
(you even call it this in the comment) for parity with sets (and python's heap implementation) - For imports, I generally prefer
from unittest import TestCase
. Repeating the module name tends to be tedious and in many cases doesn't add much. - Make sure your tests are in a separate file (if the source is in
foo/ordered_set_priority_queue.py
you'd typically place them intest/foo/test_ordered_set_priority_queue.py
; this allows you--assuming you've added all the__init__.py
s--to dopython3 -m unittest discover .
to run all tests) queue.put(0, 1)
is unclear without looking at the method definition. Perhaps use a kwarg here to make it clearer:queue.put(0, priority=1)
- Using
Thread
in a test is fairly fragile. Generally, testing concurrent datastructures is tricky business. This test may not always fail if there is a problem in your code unless you're very careful: https://stackoverflow.com/questions/8995491/testing-concurrent-data-structures. You're probably better off introducing some special breakpoints in your code (removed out for production) that allows you to pause your datastructure so you can specifically craft degenerate access patterns. - One of the key parts of a priority queue is constant time access to the top of the queue. You actually don't have that because in your
get
you dosorted(self.deques.keys())[-1]
. This isn log n
and also inefficient because it sorts the whole list (when you only need the first element). It also requires building a list in memory. Consider using python heaps to maintain a sorted heap of priorities. This way yourget
is always constant time. - Generally we prefix private attributes with
_
, soself._deques
,self._elem_sets
(call thisself._item_sets
, don't abbreviate),self._lock
, etc. - There's no need to do
priorities = list(self.elem_sets.keys())
in_clean_up()
- I'd benchmark to see if
_clean_up
is necessary. It may not be, and eliminating it makes the logic simpler (if you take my advice and use python heap stuff) - For parity with Python sets rename
put
toinsert
Here's a big detail:
The fact that you assert sorted(self.deques.keys()) == sorted(self.elem_sets.keys())
is an indication that you have a data dependency that could probably be refactored. Ideally, you'd never want that to be False
, but if you separately manage deques
and elem_sets
then perhaps they could get out of sync. This also gives you multiple ways to get all of the priority levels (ex. self._deques.keys()
and self._item_sets.keys()
). You can fix this be grouping a deque and set together into a PriorityLevel
.
class PriorityLevel(Generic[T]):
def __init__(self) -> None:
self._ordered_items: Deque[T] = deque()
self._items_set: Set[T] =
def __contains__(self, item: T) -> bool:
return item in self._items_set
def pop(self) -> T:
item = self._ordered_items.pop()
self._items_set.remove(item)
return item
# etc.
Then have your datastructure maintain one defaultdict
of these:
class OrderedSetPriorityQueue(Generic[T]):
def __init__(self) -> None:
self._levels: DefaultDict[int, PriorityLevel[T]] = defaultdict(PriorityLevel)
def __contains__(self, item: T) -> bool:
with self._lock:
return any(item in level for level in self._levels.values())
def insert(self, item, *, priority: int = 0) -> bool:
with self._lock:
self.discard(item)
return self._levels[priority].insert(item)
def discard(self, item: T) -> bool:
with self._lock:
return any(level.discard(item) for level in self._levels.values())
# etc.
In this way you never have to have a priority
and then hope that self.deques[priority]
and self.elem_sets[priority]
gives you what you want (and they haven't gotten out of sync).
You may also want to consider not using a global lock. A readwrite lock may be more efficient (especially for non-mutating operations). Refactoring out PriorityLevel
may also allow you to do per-level locking, which may be more efficient than global locking.
Also, don't check if item in self
before doing a discard
for example. It's just needless work. Do the discard
and handle failure accordingly. Also be careful with how this interacts with locking (I see you needed a notify
, which I suspect stems from this).
Wow, thank you so much for this awesome review! I just already implemented the majority of your smaller suggestions: gist.github.com/Dobiasd/37705392b4aaa3a3539ba1a61efec6b6 Next I'll look into heaps and use aPriorityLevel
class, mainly to avoid possible out-of-sync situations, but also to drop_clean_up
and makepop
O(1)
. Also I'll check out thereaderwriterlock
library. The reason why I dopriorities = list(self._elem_sets.keys())
instead of directly usingfor priority in self._elem_sets.keys():
isRuntimeError: dictionary changed size during iteration
. ;)
– Tobias Hermann
Aug 24 at 13:17
Glad I could help. And true! That makes sense. Though, that's a good reason for removing all of that complexity unless you can benchmark to prove its needed. Simple algorithms usually perform better :)
– Bailey Parker
Aug 24 at 13:20
1
Nice review. I'd add two little things: a link to PEP3102 since you almost name it, and the usage ofany
for the implementation of__contains__
:return any(item in elem_set for elem_set in self.elem_set.values())
.
– Mathias Ettinger
Aug 25 at 21:37
@MathiasEttinger Indeed, both of these are excellent suggestions :)
– Bailey Parker
Aug 25 at 21:40
add a comment |Â
up vote
10
down vote
accepted
Let's start with the good:
- Ohh recent Python 3, nice!
- Good documentation
- Good use of
typing
. This is great for datastructures. - Good formatting of your code. Nice spacing. Decent naming
- Good use of
defaultdict
- Good use of
assert
s
Things I noticed along the way:
contains
is usually spelled__contains__
. This way you can doitem in queue
.- I'd rename
contains_with_prio
. I'm not sure if I would call itcontains
, butqueue.contains(item, priority=5)
reads better to me. At the least, don't abbreviatepriority
. - I'd name
condition_var
to justlock
.with self.lock
reads better this way. - If you name it
remove
for parity with how the builtinset
does it, I'd have itraise KeyError
ifitem
isn't in the queue instead of returning abool
. If you want to return abool
, then name itdiscard
(also for parity). - I'd do
assert removed_count in (0, 1)
(tuples are preferred when you don't need to modify the object) - Instead of
empty
define a__bool__
which returnsTrue
if the queue has items. This is for parity with lists. lists and other builtins are falsy when empty - Rename
size
to__len__
so you can dolen(queue)
- Rename
get
topop
(you even call it this in the comment) for parity with sets (and python's heap implementation) - For imports, I generally prefer
from unittest import TestCase
. Repeating the module name tends to be tedious and in many cases doesn't add much. - Make sure your tests are in a separate file (if the source is in
foo/ordered_set_priority_queue.py
you'd typically place them intest/foo/test_ordered_set_priority_queue.py
; this allows you--assuming you've added all the__init__.py
s--to dopython3 -m unittest discover .
to run all tests) queue.put(0, 1)
is unclear without looking at the method definition. Perhaps use a kwarg here to make it clearer:queue.put(0, priority=1)
- Using
Thread
in a test is fairly fragile. Generally, testing concurrent datastructures is tricky business. This test may not always fail if there is a problem in your code unless you're very careful: https://stackoverflow.com/questions/8995491/testing-concurrent-data-structures. You're probably better off introducing some special breakpoints in your code (removed out for production) that allows you to pause your datastructure so you can specifically craft degenerate access patterns. - One of the key parts of a priority queue is constant time access to the top of the queue. You actually don't have that because in your
get
you dosorted(self.deques.keys())[-1]
. This isn log n
and also inefficient because it sorts the whole list (when you only need the first element). It also requires building a list in memory. Consider using python heaps to maintain a sorted heap of priorities. This way yourget
is always constant time. - Generally we prefix private attributes with
_
, soself._deques
,self._elem_sets
(call thisself._item_sets
, don't abbreviate),self._lock
, etc. - There's no need to do
priorities = list(self.elem_sets.keys())
in_clean_up()
- I'd benchmark to see if
_clean_up
is necessary. It may not be, and eliminating it makes the logic simpler (if you take my advice and use python heap stuff) - For parity with Python sets rename
put
toinsert
Here's a big detail:
The fact that you assert sorted(self.deques.keys()) == sorted(self.elem_sets.keys())
is an indication that you have a data dependency that could probably be refactored. Ideally, you'd never want that to be False
, but if you separately manage deques
and elem_sets
then perhaps they could get out of sync. This also gives you multiple ways to get all of the priority levels (ex. self._deques.keys()
and self._item_sets.keys()
). You can fix this be grouping a deque and set together into a PriorityLevel
.
class PriorityLevel(Generic[T]):
def __init__(self) -> None:
self._ordered_items: Deque[T] = deque()
self._items_set: Set[T] =
def __contains__(self, item: T) -> bool:
return item in self._items_set
def pop(self) -> T:
item = self._ordered_items.pop()
self._items_set.remove(item)
return item
# etc.
Then have your datastructure maintain one defaultdict
of these:
class OrderedSetPriorityQueue(Generic[T]):
def __init__(self) -> None:
self._levels: DefaultDict[int, PriorityLevel[T]] = defaultdict(PriorityLevel)
def __contains__(self, item: T) -> bool:
with self._lock:
return any(item in level for level in self._levels.values())
def insert(self, item, *, priority: int = 0) -> bool:
with self._lock:
self.discard(item)
return self._levels[priority].insert(item)
def discard(self, item: T) -> bool:
with self._lock:
return any(level.discard(item) for level in self._levels.values())
# etc.
In this way you never have to have a priority
and then hope that self.deques[priority]
and self.elem_sets[priority]
gives you what you want (and they haven't gotten out of sync).
You may also want to consider not using a global lock. A readwrite lock may be more efficient (especially for non-mutating operations). Refactoring out PriorityLevel
may also allow you to do per-level locking, which may be more efficient than global locking.
Also, don't check if item in self
before doing a discard
for example. It's just needless work. Do the discard
and handle failure accordingly. Also be careful with how this interacts with locking (I see you needed a notify
, which I suspect stems from this).
Wow, thank you so much for this awesome review! I just already implemented the majority of your smaller suggestions: gist.github.com/Dobiasd/37705392b4aaa3a3539ba1a61efec6b6 Next I'll look into heaps and use aPriorityLevel
class, mainly to avoid possible out-of-sync situations, but also to drop_clean_up
and makepop
O(1)
. Also I'll check out thereaderwriterlock
library. The reason why I dopriorities = list(self._elem_sets.keys())
instead of directly usingfor priority in self._elem_sets.keys():
isRuntimeError: dictionary changed size during iteration
. ;)
– Tobias Hermann
Aug 24 at 13:17
Glad I could help. And true! That makes sense. Though, that's a good reason for removing all of that complexity unless you can benchmark to prove its needed. Simple algorithms usually perform better :)
– Bailey Parker
Aug 24 at 13:20
1
Nice review. I'd add two little things: a link to PEP3102 since you almost name it, and the usage ofany
for the implementation of__contains__
:return any(item in elem_set for elem_set in self.elem_set.values())
.
– Mathias Ettinger
Aug 25 at 21:37
@MathiasEttinger Indeed, both of these are excellent suggestions :)
– Bailey Parker
Aug 25 at 21:40
add a comment |Â
up vote
10
down vote
accepted
up vote
10
down vote
accepted
Let's start with the good:
- Ohh recent Python 3, nice!
- Good documentation
- Good use of
typing
. This is great for datastructures. - Good formatting of your code. Nice spacing. Decent naming
- Good use of
defaultdict
- Good use of
assert
s
Things I noticed along the way:
contains
is usually spelled__contains__
. This way you can doitem in queue
.- I'd rename
contains_with_prio
. I'm not sure if I would call itcontains
, butqueue.contains(item, priority=5)
reads better to me. At the least, don't abbreviatepriority
. - I'd name
condition_var
to justlock
.with self.lock
reads better this way. - If you name it
remove
for parity with how the builtinset
does it, I'd have itraise KeyError
ifitem
isn't in the queue instead of returning abool
. If you want to return abool
, then name itdiscard
(also for parity). - I'd do
assert removed_count in (0, 1)
(tuples are preferred when you don't need to modify the object) - Instead of
empty
define a__bool__
which returnsTrue
if the queue has items. This is for parity with lists. lists and other builtins are falsy when empty - Rename
size
to__len__
so you can dolen(queue)
- Rename
get
topop
(you even call it this in the comment) for parity with sets (and python's heap implementation) - For imports, I generally prefer
from unittest import TestCase
. Repeating the module name tends to be tedious and in many cases doesn't add much. - Make sure your tests are in a separate file (if the source is in
foo/ordered_set_priority_queue.py
you'd typically place them intest/foo/test_ordered_set_priority_queue.py
; this allows you--assuming you've added all the__init__.py
s--to dopython3 -m unittest discover .
to run all tests) queue.put(0, 1)
is unclear without looking at the method definition. Perhaps use a kwarg here to make it clearer:queue.put(0, priority=1)
- Using
Thread
in a test is fairly fragile. Generally, testing concurrent datastructures is tricky business. This test may not always fail if there is a problem in your code unless you're very careful: https://stackoverflow.com/questions/8995491/testing-concurrent-data-structures. You're probably better off introducing some special breakpoints in your code (removed out for production) that allows you to pause your datastructure so you can specifically craft degenerate access patterns. - One of the key parts of a priority queue is constant time access to the top of the queue. You actually don't have that because in your
get
you dosorted(self.deques.keys())[-1]
. This isn log n
and also inefficient because it sorts the whole list (when you only need the first element). It also requires building a list in memory. Consider using python heaps to maintain a sorted heap of priorities. This way yourget
is always constant time. - Generally we prefix private attributes with
_
, soself._deques
,self._elem_sets
(call thisself._item_sets
, don't abbreviate),self._lock
, etc. - There's no need to do
priorities = list(self.elem_sets.keys())
in_clean_up()
- I'd benchmark to see if
_clean_up
is necessary. It may not be, and eliminating it makes the logic simpler (if you take my advice and use python heap stuff) - For parity with Python sets rename
put
toinsert
Here's a big detail:
The fact that you assert sorted(self.deques.keys()) == sorted(self.elem_sets.keys())
is an indication that you have a data dependency that could probably be refactored. Ideally, you'd never want that to be False
, but if you separately manage deques
and elem_sets
then perhaps they could get out of sync. This also gives you multiple ways to get all of the priority levels (ex. self._deques.keys()
and self._item_sets.keys()
). You can fix this be grouping a deque and set together into a PriorityLevel
.
class PriorityLevel(Generic[T]):
def __init__(self) -> None:
self._ordered_items: Deque[T] = deque()
self._items_set: Set[T] =
def __contains__(self, item: T) -> bool:
return item in self._items_set
def pop(self) -> T:
item = self._ordered_items.pop()
self._items_set.remove(item)
return item
# etc.
Then have your datastructure maintain one defaultdict
of these:
class OrderedSetPriorityQueue(Generic[T]):
def __init__(self) -> None:
self._levels: DefaultDict[int, PriorityLevel[T]] = defaultdict(PriorityLevel)
def __contains__(self, item: T) -> bool:
with self._lock:
return any(item in level for level in self._levels.values())
def insert(self, item, *, priority: int = 0) -> bool:
with self._lock:
self.discard(item)
return self._levels[priority].insert(item)
def discard(self, item: T) -> bool:
with self._lock:
return any(level.discard(item) for level in self._levels.values())
# etc.
In this way you never have to have a priority
and then hope that self.deques[priority]
and self.elem_sets[priority]
gives you what you want (and they haven't gotten out of sync).
You may also want to consider not using a global lock. A readwrite lock may be more efficient (especially for non-mutating operations). Refactoring out PriorityLevel
may also allow you to do per-level locking, which may be more efficient than global locking.
Also, don't check if item in self
before doing a discard
for example. It's just needless work. Do the discard
and handle failure accordingly. Also be careful with how this interacts with locking (I see you needed a notify
, which I suspect stems from this).
Let's start with the good:
- Ohh recent Python 3, nice!
- Good documentation
- Good use of
typing
. This is great for datastructures. - Good formatting of your code. Nice spacing. Decent naming
- Good use of
defaultdict
- Good use of
assert
s
Things I noticed along the way:
contains
is usually spelled__contains__
. This way you can doitem in queue
.- I'd rename
contains_with_prio
. I'm not sure if I would call itcontains
, butqueue.contains(item, priority=5)
reads better to me. At the least, don't abbreviatepriority
. - I'd name
condition_var
to justlock
.with self.lock
reads better this way. - If you name it
remove
for parity with how the builtinset
does it, I'd have itraise KeyError
ifitem
isn't in the queue instead of returning abool
. If you want to return abool
, then name itdiscard
(also for parity). - I'd do
assert removed_count in (0, 1)
(tuples are preferred when you don't need to modify the object) - Instead of
empty
define a__bool__
which returnsTrue
if the queue has items. This is for parity with lists. lists and other builtins are falsy when empty - Rename
size
to__len__
so you can dolen(queue)
- Rename
get
topop
(you even call it this in the comment) for parity with sets (and python's heap implementation) - For imports, I generally prefer
from unittest import TestCase
. Repeating the module name tends to be tedious and in many cases doesn't add much. - Make sure your tests are in a separate file (if the source is in
foo/ordered_set_priority_queue.py
you'd typically place them intest/foo/test_ordered_set_priority_queue.py
; this allows you--assuming you've added all the__init__.py
s--to dopython3 -m unittest discover .
to run all tests) queue.put(0, 1)
is unclear without looking at the method definition. Perhaps use a kwarg here to make it clearer:queue.put(0, priority=1)
- Using
Thread
in a test is fairly fragile. Generally, testing concurrent datastructures is tricky business. This test may not always fail if there is a problem in your code unless you're very careful: https://stackoverflow.com/questions/8995491/testing-concurrent-data-structures. You're probably better off introducing some special breakpoints in your code (removed out for production) that allows you to pause your datastructure so you can specifically craft degenerate access patterns. - One of the key parts of a priority queue is constant time access to the top of the queue. You actually don't have that because in your
get
you dosorted(self.deques.keys())[-1]
. This isn log n
and also inefficient because it sorts the whole list (when you only need the first element). It also requires building a list in memory. Consider using python heaps to maintain a sorted heap of priorities. This way yourget
is always constant time. - Generally we prefix private attributes with
_
, soself._deques
,self._elem_sets
(call thisself._item_sets
, don't abbreviate),self._lock
, etc. - There's no need to do
priorities = list(self.elem_sets.keys())
in_clean_up()
- I'd benchmark to see if
_clean_up
is necessary. It may not be, and eliminating it makes the logic simpler (if you take my advice and use python heap stuff) - For parity with Python sets rename
put
toinsert
Here's a big detail:
The fact that you assert sorted(self.deques.keys()) == sorted(self.elem_sets.keys())
is an indication that you have a data dependency that could probably be refactored. Ideally, you'd never want that to be False
, but if you separately manage deques
and elem_sets
then perhaps they could get out of sync. This also gives you multiple ways to get all of the priority levels (ex. self._deques.keys()
and self._item_sets.keys()
). You can fix this be grouping a deque and set together into a PriorityLevel
.
class PriorityLevel(Generic[T]):
def __init__(self) -> None:
self._ordered_items: Deque[T] = deque()
self._items_set: Set[T] =
def __contains__(self, item: T) -> bool:
return item in self._items_set
def pop(self) -> T:
item = self._ordered_items.pop()
self._items_set.remove(item)
return item
# etc.
Then have your datastructure maintain one defaultdict
of these:
class OrderedSetPriorityQueue(Generic[T]):
def __init__(self) -> None:
self._levels: DefaultDict[int, PriorityLevel[T]] = defaultdict(PriorityLevel)
def __contains__(self, item: T) -> bool:
with self._lock:
return any(item in level for level in self._levels.values())
def insert(self, item, *, priority: int = 0) -> bool:
with self._lock:
self.discard(item)
return self._levels[priority].insert(item)
def discard(self, item: T) -> bool:
with self._lock:
return any(level.discard(item) for level in self._levels.values())
# etc.
In this way you never have to have a priority
and then hope that self.deques[priority]
and self.elem_sets[priority]
gives you what you want (and they haven't gotten out of sync).
You may also want to consider not using a global lock. A readwrite lock may be more efficient (especially for non-mutating operations). Refactoring out PriorityLevel
may also allow you to do per-level locking, which may be more efficient than global locking.
Also, don't check if item in self
before doing a discard
for example. It's just needless work. Do the discard
and handle failure accordingly. Also be careful with how this interacts with locking (I see you needed a notify
, which I suspect stems from this).
edited Aug 25 at 21:42
answered Aug 24 at 12:46
Bailey Parker
1,426913
1,426913
Wow, thank you so much for this awesome review! I just already implemented the majority of your smaller suggestions: gist.github.com/Dobiasd/37705392b4aaa3a3539ba1a61efec6b6 Next I'll look into heaps and use aPriorityLevel
class, mainly to avoid possible out-of-sync situations, but also to drop_clean_up
and makepop
O(1)
. Also I'll check out thereaderwriterlock
library. The reason why I dopriorities = list(self._elem_sets.keys())
instead of directly usingfor priority in self._elem_sets.keys():
isRuntimeError: dictionary changed size during iteration
. ;)
– Tobias Hermann
Aug 24 at 13:17
Glad I could help. And true! That makes sense. Though, that's a good reason for removing all of that complexity unless you can benchmark to prove its needed. Simple algorithms usually perform better :)
– Bailey Parker
Aug 24 at 13:20
1
Nice review. I'd add two little things: a link to PEP3102 since you almost name it, and the usage ofany
for the implementation of__contains__
:return any(item in elem_set for elem_set in self.elem_set.values())
.
– Mathias Ettinger
Aug 25 at 21:37
@MathiasEttinger Indeed, both of these are excellent suggestions :)
– Bailey Parker
Aug 25 at 21:40
add a comment |Â
Wow, thank you so much for this awesome review! I just already implemented the majority of your smaller suggestions: gist.github.com/Dobiasd/37705392b4aaa3a3539ba1a61efec6b6 Next I'll look into heaps and use aPriorityLevel
class, mainly to avoid possible out-of-sync situations, but also to drop_clean_up
and makepop
O(1)
. Also I'll check out thereaderwriterlock
library. The reason why I dopriorities = list(self._elem_sets.keys())
instead of directly usingfor priority in self._elem_sets.keys():
isRuntimeError: dictionary changed size during iteration
. ;)
– Tobias Hermann
Aug 24 at 13:17
Glad I could help. And true! That makes sense. Though, that's a good reason for removing all of that complexity unless you can benchmark to prove its needed. Simple algorithms usually perform better :)
– Bailey Parker
Aug 24 at 13:20
1
Nice review. I'd add two little things: a link to PEP3102 since you almost name it, and the usage ofany
for the implementation of__contains__
:return any(item in elem_set for elem_set in self.elem_set.values())
.
– Mathias Ettinger
Aug 25 at 21:37
@MathiasEttinger Indeed, both of these are excellent suggestions :)
– Bailey Parker
Aug 25 at 21:40
Wow, thank you so much for this awesome review! I just already implemented the majority of your smaller suggestions: gist.github.com/Dobiasd/37705392b4aaa3a3539ba1a61efec6b6 Next I'll look into heaps and use a
PriorityLevel
class, mainly to avoid possible out-of-sync situations, but also to drop _clean_up
and make pop
O(1)
. Also I'll check out the readerwriterlock
library. The reason why I do priorities = list(self._elem_sets.keys())
instead of directly using for priority in self._elem_sets.keys():
is RuntimeError: dictionary changed size during iteration
. ;)– Tobias Hermann
Aug 24 at 13:17
Wow, thank you so much for this awesome review! I just already implemented the majority of your smaller suggestions: gist.github.com/Dobiasd/37705392b4aaa3a3539ba1a61efec6b6 Next I'll look into heaps and use a
PriorityLevel
class, mainly to avoid possible out-of-sync situations, but also to drop _clean_up
and make pop
O(1)
. Also I'll check out the readerwriterlock
library. The reason why I do priorities = list(self._elem_sets.keys())
instead of directly using for priority in self._elem_sets.keys():
is RuntimeError: dictionary changed size during iteration
. ;)– Tobias Hermann
Aug 24 at 13:17
Glad I could help. And true! That makes sense. Though, that's a good reason for removing all of that complexity unless you can benchmark to prove its needed. Simple algorithms usually perform better :)
– Bailey Parker
Aug 24 at 13:20
Glad I could help. And true! That makes sense. Though, that's a good reason for removing all of that complexity unless you can benchmark to prove its needed. Simple algorithms usually perform better :)
– Bailey Parker
Aug 24 at 13:20
1
1
Nice review. I'd add two little things: a link to PEP3102 since you almost name it, and the usage of
any
for the implementation of __contains__
: return any(item in elem_set for elem_set in self.elem_set.values())
.– Mathias Ettinger
Aug 25 at 21:37
Nice review. I'd add two little things: a link to PEP3102 since you almost name it, and the usage of
any
for the implementation of __contains__
: return any(item in elem_set for elem_set in self.elem_set.values())
.– Mathias Ettinger
Aug 25 at 21:37
@MathiasEttinger Indeed, both of these are excellent suggestions :)
– Bailey Parker
Aug 25 at 21:40
@MathiasEttinger Indeed, both of these are excellent suggestions :)
– Bailey Parker
Aug 25 at 21:40
add a comment |Â
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f202393%2fa-thread-safe-priority-queue-keeping-its-elements-unique%23new-answer', 'question_page');
);
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
You're aware of the
PriorityQueue
yes? (from queue import PriorityQueue
) As for the constraint of no duplicate items, I guess you'll want to have a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue.– C. Harley
Aug 28 at 8:39
@C.Harley Yes, I know
PriorityQueue
. But how can one check if an item is already present in it?– Tobias Hermann
Aug 28 at 10:19
You'll want to create a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue
– C. Harley
Aug 30 at 8:16
Ah, so you mean maintaining additional sets for those checks like
self.elem_sets
in the current implementation.put
would check if the item is already in the set before adding it to the wrapped queue. Andget
would remove the just-popped item from the set. Yes, seems to make sense. But I'm not yet sure how updating the priority of an item already in the queue would work.– Tobias Hermann
Aug 30 at 9:21
If you're modifying items after they've been set, then your implementation is incorrectly assigning priority to begin with. Return to the drawing board and correct your design.
– C. Harley
Aug 31 at 2:24