A thread-safe priority queue keeping its elements unique

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;







up vote
9
down vote

favorite
1












This class is intended to be used in situations where queue.Queue does not suffice, because the following two properties are needed.



  • Every item in the queue can have a priority assigned.

  • No item can occur more than once in the queue.

So here is my current implementation. I would be happy to hear your ideas on how the code can be improved.



"""A thread-safe priority queue keeping its elements unique
"""

import collections
import threading
import time
import unittest
from typing import DefaultDict, Deque, Set, Generic, TypeVar

T = TypeVar('T')


class OrderedSetPriorityQueue(Generic[T]):
"""A thread-safe priority queue keeping its elements unique"""

def __init__(self) -> None:
self.deques: DefaultDict[int, Deque[T]] =
collections.defaultdict(collections.deque)
self.elem_sets: DefaultDict[int, Set[T]] =
collections.defaultdict(set)
self.condition_var = threading.Condition(threading.RLock())

def contains(self, item: T) -> bool:
"""Check if the item is already queued."""
with self.condition_var:
for elem_set in self.elem_sets.values():
if item in elem_set:
return True
return False

def contains_with_prio(self, item: T, priority: int) -> bool:
"""Check if the item is already queued with this exact priority."""
with self.condition_var:
if priority not in self.elem_sets:
return False
return item in self.elem_sets[priority]

def remove(self, item: T) -> bool:
"""Remove an item from the queue, disregarding its stored priority."""
with self.condition_var:
if not self.contains(item):
return False
removed_count = 0
for set_prio, elem_set in self.elem_sets.items():
if item in elem_set:
self.deques[set_prio].remove(item)
elem_set.remove(item)
removed_count += 1
assert removed_count in [0, 1]
self._clean_up()
return removed_count > 0

def put(self, item: T, priority: int = 0) -> bool:
"""Returns False if item already is queued with the same priority.
If is has not been queued yet, it is added and True is returned.
If it already was queued but with a different priority,
the entry with the old priority will be removed,
and the new one is added."""
with self.condition_var:
if self.contains_with_prio(item, priority):
return False
if self.contains(item):
self.remove(item)
self.deques[priority].appendleft(item)
self.elem_sets[priority].add(item)
self.condition_var.notify()
return True

def empty(self) -> bool:
"""Return True if the queue is completely empty."""
with self.condition_var:
assert bool(self.elem_sets) == bool(self.deques)
return not self.elem_sets

def size(self) -> int:
"""Number of elements in the queue."""
with self.condition_var:
return sum(map(len, self.elem_sets.values()))

def get(self) -> T:
"""Pop the oldest item from the highest priority."""
with self.condition_var:
while not self.elem_sets:
self.condition_var.wait()
priority = sorted(self.deques.keys())[-1]
item = self.deques[priority].pop()
self.elem_sets[priority].remove(item)
self._clean_up()
return item

def _clean_up(self) -> None:
"""Internal function used to clean up unused data structures."""
with self.condition_var:
assert sorted(self.deques.keys()) == sorted(self.elem_sets.keys())
priorities = list(self.elem_sets.keys())
for priority in priorities:
if not self.deques[priority]:
del self.deques[priority]
if not self.elem_sets[priority]:
del self.elem_sets[priority]


class OrderedSetPriorityQueueTest(unittest.TestCase):
"""
Verify integrity of custom queue implementation
"""

def test_get_and_put(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())

def test_order(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
queue.put(2)
queue.put(1)
self.assertEqual(0, queue.get())
self.assertEqual(2, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())

def test_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 0)
queue.put(1, 1)
self.assertEqual(1, queue.get())
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())

def test_increase_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 0)
queue.put(1, 1)
queue.put(0, 2)
self.assertEqual(2, queue.size())
self.assertEqual(0, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())

def test_decrease_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 1)
queue.put(1, 1)
queue.put(0, 0)
self.assertEqual(1, queue.get())
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())

def test_uniqueness(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
queue.put(0)
queue.put(1)
queue.put(0)
self.assertEqual(0, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())

def test_get_with_wait(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()

def add_to_queue() -> None:
time.sleep(0.2)
queue.put(0)

thread = threading.Thread(target=add_to_queue, args=(), kwargs=)
thread.start()
self.assertEqual(0, queue.get())
thread.join()
self.assertTrue(queue.empty())






share|improve this question






















  • You're aware of the PriorityQueue yes? (from queue import PriorityQueue) As for the constraint of no duplicate items, I guess you'll want to have a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue.
    – C. Harley
    Aug 28 at 8:39










  • @C.Harley Yes, I know PriorityQueue. But how can one check if an item is already present in it?
    – Tobias Hermann
    Aug 28 at 10:19










  • You'll want to create a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue
    – C. Harley
    Aug 30 at 8:16










  • Ah, so you mean maintaining additional sets for those checks like self.elem_sets in the current implementation. put would check if the item is already in the set before adding it to the wrapped queue. And get would remove the just-popped item from the set. Yes, seems to make sense. But I'm not yet sure how updating the priority of an item already in the queue would work.
    – Tobias Hermann
    Aug 30 at 9:21











  • If you're modifying items after they've been set, then your implementation is incorrectly assigning priority to begin with. Return to the drawing board and correct your design.
    – C. Harley
    Aug 31 at 2:24
















up vote
9
down vote

favorite
1












This class is intended to be used in situations where queue.Queue does not suffice, because the following two properties are needed.



  • Every item in the queue can have a priority assigned.

  • No item can occur more than once in the queue.

So here is my current implementation. I would be happy to hear your ideas on how the code can be improved.



"""A thread-safe priority queue keeping its elements unique
"""

import collections
import threading
import time
import unittest
from typing import DefaultDict, Deque, Set, Generic, TypeVar

T = TypeVar('T')


class OrderedSetPriorityQueue(Generic[T]):
"""A thread-safe priority queue keeping its elements unique"""

def __init__(self) -> None:
self.deques: DefaultDict[int, Deque[T]] =
collections.defaultdict(collections.deque)
self.elem_sets: DefaultDict[int, Set[T]] =
collections.defaultdict(set)
self.condition_var = threading.Condition(threading.RLock())

def contains(self, item: T) -> bool:
"""Check if the item is already queued."""
with self.condition_var:
for elem_set in self.elem_sets.values():
if item in elem_set:
return True
return False

def contains_with_prio(self, item: T, priority: int) -> bool:
"""Check if the item is already queued with this exact priority."""
with self.condition_var:
if priority not in self.elem_sets:
return False
return item in self.elem_sets[priority]

def remove(self, item: T) -> bool:
"""Remove an item from the queue, disregarding its stored priority."""
with self.condition_var:
if not self.contains(item):
return False
removed_count = 0
for set_prio, elem_set in self.elem_sets.items():
if item in elem_set:
self.deques[set_prio].remove(item)
elem_set.remove(item)
removed_count += 1
assert removed_count in [0, 1]
self._clean_up()
return removed_count > 0

def put(self, item: T, priority: int = 0) -> bool:
"""Returns False if item already is queued with the same priority.
If is has not been queued yet, it is added and True is returned.
If it already was queued but with a different priority,
the entry with the old priority will be removed,
and the new one is added."""
with self.condition_var:
if self.contains_with_prio(item, priority):
return False
if self.contains(item):
self.remove(item)
self.deques[priority].appendleft(item)
self.elem_sets[priority].add(item)
self.condition_var.notify()
return True

def empty(self) -> bool:
"""Return True if the queue is completely empty."""
with self.condition_var:
assert bool(self.elem_sets) == bool(self.deques)
return not self.elem_sets

def size(self) -> int:
"""Number of elements in the queue."""
with self.condition_var:
return sum(map(len, self.elem_sets.values()))

def get(self) -> T:
"""Pop the oldest item from the highest priority."""
with self.condition_var:
while not self.elem_sets:
self.condition_var.wait()
priority = sorted(self.deques.keys())[-1]
item = self.deques[priority].pop()
self.elem_sets[priority].remove(item)
self._clean_up()
return item

def _clean_up(self) -> None:
"""Internal function used to clean up unused data structures."""
with self.condition_var:
assert sorted(self.deques.keys()) == sorted(self.elem_sets.keys())
priorities = list(self.elem_sets.keys())
for priority in priorities:
if not self.deques[priority]:
del self.deques[priority]
if not self.elem_sets[priority]:
del self.elem_sets[priority]


class OrderedSetPriorityQueueTest(unittest.TestCase):
"""
Verify integrity of custom queue implementation
"""

def test_get_and_put(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())

def test_order(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
queue.put(2)
queue.put(1)
self.assertEqual(0, queue.get())
self.assertEqual(2, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())

def test_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 0)
queue.put(1, 1)
self.assertEqual(1, queue.get())
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())

def test_increase_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 0)
queue.put(1, 1)
queue.put(0, 2)
self.assertEqual(2, queue.size())
self.assertEqual(0, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())

def test_decrease_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 1)
queue.put(1, 1)
queue.put(0, 0)
self.assertEqual(1, queue.get())
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())

def test_uniqueness(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
queue.put(0)
queue.put(1)
queue.put(0)
self.assertEqual(0, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())

def test_get_with_wait(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()

def add_to_queue() -> None:
time.sleep(0.2)
queue.put(0)

thread = threading.Thread(target=add_to_queue, args=(), kwargs=)
thread.start()
self.assertEqual(0, queue.get())
thread.join()
self.assertTrue(queue.empty())






share|improve this question






















  • You're aware of the PriorityQueue yes? (from queue import PriorityQueue) As for the constraint of no duplicate items, I guess you'll want to have a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue.
    – C. Harley
    Aug 28 at 8:39










  • @C.Harley Yes, I know PriorityQueue. But how can one check if an item is already present in it?
    – Tobias Hermann
    Aug 28 at 10:19










  • You'll want to create a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue
    – C. Harley
    Aug 30 at 8:16










  • Ah, so you mean maintaining additional sets for those checks like self.elem_sets in the current implementation. put would check if the item is already in the set before adding it to the wrapped queue. And get would remove the just-popped item from the set. Yes, seems to make sense. But I'm not yet sure how updating the priority of an item already in the queue would work.
    – Tobias Hermann
    Aug 30 at 9:21











  • If you're modifying items after they've been set, then your implementation is incorrectly assigning priority to begin with. Return to the drawing board and correct your design.
    – C. Harley
    Aug 31 at 2:24












up vote
9
down vote

favorite
1









up vote
9
down vote

favorite
1






1





This class is intended to be used in situations where queue.Queue does not suffice, because the following two properties are needed.



  • Every item in the queue can have a priority assigned.

  • No item can occur more than once in the queue.

So here is my current implementation. I would be happy to hear your ideas on how the code can be improved.



"""A thread-safe priority queue keeping its elements unique
"""

import collections
import threading
import time
import unittest
from typing import DefaultDict, Deque, Set, Generic, TypeVar

T = TypeVar('T')


class OrderedSetPriorityQueue(Generic[T]):
"""A thread-safe priority queue keeping its elements unique"""

def __init__(self) -> None:
self.deques: DefaultDict[int, Deque[T]] =
collections.defaultdict(collections.deque)
self.elem_sets: DefaultDict[int, Set[T]] =
collections.defaultdict(set)
self.condition_var = threading.Condition(threading.RLock())

def contains(self, item: T) -> bool:
"""Check if the item is already queued."""
with self.condition_var:
for elem_set in self.elem_sets.values():
if item in elem_set:
return True
return False

def contains_with_prio(self, item: T, priority: int) -> bool:
"""Check if the item is already queued with this exact priority."""
with self.condition_var:
if priority not in self.elem_sets:
return False
return item in self.elem_sets[priority]

def remove(self, item: T) -> bool:
"""Remove an item from the queue, disregarding its stored priority."""
with self.condition_var:
if not self.contains(item):
return False
removed_count = 0
for set_prio, elem_set in self.elem_sets.items():
if item in elem_set:
self.deques[set_prio].remove(item)
elem_set.remove(item)
removed_count += 1
assert removed_count in [0, 1]
self._clean_up()
return removed_count > 0

def put(self, item: T, priority: int = 0) -> bool:
"""Returns False if item already is queued with the same priority.
If is has not been queued yet, it is added and True is returned.
If it already was queued but with a different priority,
the entry with the old priority will be removed,
and the new one is added."""
with self.condition_var:
if self.contains_with_prio(item, priority):
return False
if self.contains(item):
self.remove(item)
self.deques[priority].appendleft(item)
self.elem_sets[priority].add(item)
self.condition_var.notify()
return True

def empty(self) -> bool:
"""Return True if the queue is completely empty."""
with self.condition_var:
assert bool(self.elem_sets) == bool(self.deques)
return not self.elem_sets

def size(self) -> int:
"""Number of elements in the queue."""
with self.condition_var:
return sum(map(len, self.elem_sets.values()))

def get(self) -> T:
"""Pop the oldest item from the highest priority."""
with self.condition_var:
while not self.elem_sets:
self.condition_var.wait()
priority = sorted(self.deques.keys())[-1]
item = self.deques[priority].pop()
self.elem_sets[priority].remove(item)
self._clean_up()
return item

def _clean_up(self) -> None:
"""Internal function used to clean up unused data structures."""
with self.condition_var:
assert sorted(self.deques.keys()) == sorted(self.elem_sets.keys())
priorities = list(self.elem_sets.keys())
for priority in priorities:
if not self.deques[priority]:
del self.deques[priority]
if not self.elem_sets[priority]:
del self.elem_sets[priority]


class OrderedSetPriorityQueueTest(unittest.TestCase):
"""
Verify integrity of custom queue implementation
"""

def test_get_and_put(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())

def test_order(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
queue.put(2)
queue.put(1)
self.assertEqual(0, queue.get())
self.assertEqual(2, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())

def test_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 0)
queue.put(1, 1)
self.assertEqual(1, queue.get())
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())

def test_increase_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 0)
queue.put(1, 1)
queue.put(0, 2)
self.assertEqual(2, queue.size())
self.assertEqual(0, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())

def test_decrease_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 1)
queue.put(1, 1)
queue.put(0, 0)
self.assertEqual(1, queue.get())
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())

def test_uniqueness(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
queue.put(0)
queue.put(1)
queue.put(0)
self.assertEqual(0, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())

def test_get_with_wait(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()

def add_to_queue() -> None:
time.sleep(0.2)
queue.put(0)

thread = threading.Thread(target=add_to_queue, args=(), kwargs=)
thread.start()
self.assertEqual(0, queue.get())
thread.join()
self.assertTrue(queue.empty())






share|improve this question














This class is intended to be used in situations where queue.Queue does not suffice, because the following two properties are needed.



  • Every item in the queue can have a priority assigned.

  • No item can occur more than once in the queue.

So here is my current implementation. I would be happy to hear your ideas on how the code can be improved.



"""A thread-safe priority queue keeping its elements unique
"""

import collections
import threading
import time
import unittest
from typing import DefaultDict, Deque, Set, Generic, TypeVar

T = TypeVar('T')


class OrderedSetPriorityQueue(Generic[T]):
"""A thread-safe priority queue keeping its elements unique"""

def __init__(self) -> None:
self.deques: DefaultDict[int, Deque[T]] =
collections.defaultdict(collections.deque)
self.elem_sets: DefaultDict[int, Set[T]] =
collections.defaultdict(set)
self.condition_var = threading.Condition(threading.RLock())

def contains(self, item: T) -> bool:
"""Check if the item is already queued."""
with self.condition_var:
for elem_set in self.elem_sets.values():
if item in elem_set:
return True
return False

def contains_with_prio(self, item: T, priority: int) -> bool:
"""Check if the item is already queued with this exact priority."""
with self.condition_var:
if priority not in self.elem_sets:
return False
return item in self.elem_sets[priority]

def remove(self, item: T) -> bool:
"""Remove an item from the queue, disregarding its stored priority."""
with self.condition_var:
if not self.contains(item):
return False
removed_count = 0
for set_prio, elem_set in self.elem_sets.items():
if item in elem_set:
self.deques[set_prio].remove(item)
elem_set.remove(item)
removed_count += 1
assert removed_count in [0, 1]
self._clean_up()
return removed_count > 0

def put(self, item: T, priority: int = 0) -> bool:
"""Returns False if item already is queued with the same priority.
If is has not been queued yet, it is added and True is returned.
If it already was queued but with a different priority,
the entry with the old priority will be removed,
and the new one is added."""
with self.condition_var:
if self.contains_with_prio(item, priority):
return False
if self.contains(item):
self.remove(item)
self.deques[priority].appendleft(item)
self.elem_sets[priority].add(item)
self.condition_var.notify()
return True

def empty(self) -> bool:
"""Return True if the queue is completely empty."""
with self.condition_var:
assert bool(self.elem_sets) == bool(self.deques)
return not self.elem_sets

def size(self) -> int:
"""Number of elements in the queue."""
with self.condition_var:
return sum(map(len, self.elem_sets.values()))

def get(self) -> T:
"""Pop the oldest item from the highest priority."""
with self.condition_var:
while not self.elem_sets:
self.condition_var.wait()
priority = sorted(self.deques.keys())[-1]
item = self.deques[priority].pop()
self.elem_sets[priority].remove(item)
self._clean_up()
return item

def _clean_up(self) -> None:
"""Internal function used to clean up unused data structures."""
with self.condition_var:
assert sorted(self.deques.keys()) == sorted(self.elem_sets.keys())
priorities = list(self.elem_sets.keys())
for priority in priorities:
if not self.deques[priority]:
del self.deques[priority]
if not self.elem_sets[priority]:
del self.elem_sets[priority]


class OrderedSetPriorityQueueTest(unittest.TestCase):
"""
Verify integrity of custom queue implementation
"""

def test_get_and_put(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())

def test_order(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
queue.put(2)
queue.put(1)
self.assertEqual(0, queue.get())
self.assertEqual(2, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())

def test_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 0)
queue.put(1, 1)
self.assertEqual(1, queue.get())
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())

def test_increase_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 0)
queue.put(1, 1)
queue.put(0, 2)
self.assertEqual(2, queue.size())
self.assertEqual(0, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())

def test_decrease_priority(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0, 1)
queue.put(1, 1)
queue.put(0, 0)
self.assertEqual(1, queue.get())
self.assertEqual(0, queue.get())
self.assertTrue(queue.empty())

def test_uniqueness(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()
queue.put(0)
queue.put(0)
queue.put(1)
queue.put(0)
self.assertEqual(0, queue.get())
self.assertEqual(1, queue.get())
self.assertTrue(queue.empty())

def test_get_with_wait(self) -> None:
queue: OrderedSetPriorityQueue[int] = OrderedSetPriorityQueue()

def add_to_queue() -> None:
time.sleep(0.2)
queue.put(0)

thread = threading.Thread(target=add_to_queue, args=(), kwargs=)
thread.start()
self.assertEqual(0, queue.get())
thread.join()
self.assertTrue(queue.empty())








share|improve this question













share|improve this question




share|improve this question








edited Aug 24 at 14:25

























asked Aug 24 at 11:46









Tobias Hermann

390217




390217











  • You're aware of the PriorityQueue yes? (from queue import PriorityQueue) As for the constraint of no duplicate items, I guess you'll want to have a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue.
    – C. Harley
    Aug 28 at 8:39










  • @C.Harley Yes, I know PriorityQueue. But how can one check if an item is already present in it?
    – Tobias Hermann
    Aug 28 at 10:19










  • You'll want to create a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue
    – C. Harley
    Aug 30 at 8:16










  • Ah, so you mean maintaining additional sets for those checks like self.elem_sets in the current implementation. put would check if the item is already in the set before adding it to the wrapped queue. And get would remove the just-popped item from the set. Yes, seems to make sense. But I'm not yet sure how updating the priority of an item already in the queue would work.
    – Tobias Hermann
    Aug 30 at 9:21











  • If you're modifying items after they've been set, then your implementation is incorrectly assigning priority to begin with. Return to the drawing board and correct your design.
    – C. Harley
    Aug 31 at 2:24
















  • You're aware of the PriorityQueue yes? (from queue import PriorityQueue) As for the constraint of no duplicate items, I guess you'll want to have a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue.
    – C. Harley
    Aug 28 at 8:39










  • @C.Harley Yes, I know PriorityQueue. But how can one check if an item is already present in it?
    – Tobias Hermann
    Aug 28 at 10:19










  • You'll want to create a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue
    – C. Harley
    Aug 30 at 8:16










  • Ah, so you mean maintaining additional sets for those checks like self.elem_sets in the current implementation. put would check if the item is already in the set before adding it to the wrapped queue. And get would remove the just-popped item from the set. Yes, seems to make sense. But I'm not yet sure how updating the priority of an item already in the queue would work.
    – Tobias Hermann
    Aug 30 at 9:21











  • If you're modifying items after they've been set, then your implementation is incorrectly assigning priority to begin with. Return to the drawing board and correct your design.
    – C. Harley
    Aug 31 at 2:24















You're aware of the PriorityQueue yes? (from queue import PriorityQueue) As for the constraint of no duplicate items, I guess you'll want to have a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue.
– C. Harley
Aug 28 at 8:39




You're aware of the PriorityQueue yes? (from queue import PriorityQueue) As for the constraint of no duplicate items, I guess you'll want to have a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue.
– C. Harley
Aug 28 at 8:39












@C.Harley Yes, I know PriorityQueue. But how can one check if an item is already present in it?
– Tobias Hermann
Aug 28 at 10:19




@C.Harley Yes, I know PriorityQueue. But how can one check if an item is already present in it?
– Tobias Hermann
Aug 28 at 10:19












You'll want to create a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue
– C. Harley
Aug 30 at 8:16




You'll want to create a wrapper function to perform a check/comparison before actually adding data items to the priorityqueue
– C. Harley
Aug 30 at 8:16












Ah, so you mean maintaining additional sets for those checks like self.elem_sets in the current implementation. put would check if the item is already in the set before adding it to the wrapped queue. And get would remove the just-popped item from the set. Yes, seems to make sense. But I'm not yet sure how updating the priority of an item already in the queue would work.
– Tobias Hermann
Aug 30 at 9:21





Ah, so you mean maintaining additional sets for those checks like self.elem_sets in the current implementation. put would check if the item is already in the set before adding it to the wrapped queue. And get would remove the just-popped item from the set. Yes, seems to make sense. But I'm not yet sure how updating the priority of an item already in the queue would work.
– Tobias Hermann
Aug 30 at 9:21













If you're modifying items after they've been set, then your implementation is incorrectly assigning priority to begin with. Return to the drawing board and correct your design.
– C. Harley
Aug 31 at 2:24




If you're modifying items after they've been set, then your implementation is incorrectly assigning priority to begin with. Return to the drawing board and correct your design.
– C. Harley
Aug 31 at 2:24










1 Answer
1






active

oldest

votes

















up vote
10
down vote



accepted










Let's start with the good:



  • Ohh recent Python 3, nice!

  • Good documentation

  • Good use of typing. This is great for datastructures.

  • Good formatting of your code. Nice spacing. Decent naming

  • Good use of defaultdict

  • Good use of asserts

Things I noticed along the way:




  • contains is usually spelled __contains__. This way you can do item in queue.

  • I'd rename contains_with_prio. I'm not sure if I would call it contains, but queue.contains(item, priority=5) reads better to me. At the least, don't abbreviate priority.

  • I'd name condition_var to just lock. with self.lock reads better this way.

  • If you name it remove for parity with how the builtin set does it, I'd have it raise KeyError if item isn't in the queue instead of returning a bool. If you want to return a bool, then name it discard (also for parity).

  • I'd do assert removed_count in (0, 1) (tuples are preferred when you don't need to modify the object)

  • Instead of empty define a __bool__ which returns True if the queue has items. This is for parity with lists. lists and other builtins are falsy when empty

  • Rename size to __len__ so you can do len(queue)

  • Rename get to pop (you even call it this in the comment) for parity with sets (and python's heap implementation)

  • For imports, I generally prefer from unittest import TestCase. Repeating the module name tends to be tedious and in many cases doesn't add much.

  • Make sure your tests are in a separate file (if the source is in foo/ordered_set_priority_queue.py you'd typically place them in test/foo/test_ordered_set_priority_queue.py; this allows you--assuming you've added all the __init__.pys--to do python3 -m unittest discover . to run all tests)


  • queue.put(0, 1) is unclear without looking at the method definition. Perhaps use a kwarg here to make it clearer: queue.put(0, priority=1)

  • Using Thread in a test is fairly fragile. Generally, testing concurrent datastructures is tricky business. This test may not always fail if there is a problem in your code unless you're very careful: https://stackoverflow.com/questions/8995491/testing-concurrent-data-structures. You're probably better off introducing some special breakpoints in your code (removed out for production) that allows you to pause your datastructure so you can specifically craft degenerate access patterns.

  • One of the key parts of a priority queue is constant time access to the top of the queue. You actually don't have that because in your get you do sorted(self.deques.keys())[-1]. This is n log n and also inefficient because it sorts the whole list (when you only need the first element). It also requires building a list in memory. Consider using python heaps to maintain a sorted heap of priorities. This way your get is always constant time.

  • Generally we prefix private attributes with _, so self._deques, self._elem_sets (call this self._item_sets, don't abbreviate), self._lock, etc.

  • There's no need to do priorities = list(self.elem_sets.keys()) in _clean_up()

  • I'd benchmark to see if _clean_up is necessary. It may not be, and eliminating it makes the logic simpler (if you take my advice and use python heap stuff)

  • For parity with Python sets rename put to insert

Here's a big detail:



The fact that you assert sorted(self.deques.keys()) == sorted(self.elem_sets.keys()) is an indication that you have a data dependency that could probably be refactored. Ideally, you'd never want that to be False, but if you separately manage deques and elem_sets then perhaps they could get out of sync. This also gives you multiple ways to get all of the priority levels (ex. self._deques.keys() and self._item_sets.keys()). You can fix this be grouping a deque and set together into a PriorityLevel.



class PriorityLevel(Generic[T]):
def __init__(self) -> None:
self._ordered_items: Deque[T] = deque()
self._items_set: Set[T] =

def __contains__(self, item: T) -> bool:
return item in self._items_set

def pop(self) -> T:
item = self._ordered_items.pop()
self._items_set.remove(item)
return item

# etc.


Then have your datastructure maintain one defaultdict of these:



class OrderedSetPriorityQueue(Generic[T]):
def __init__(self) -> None:
self._levels: DefaultDict[int, PriorityLevel[T]] = defaultdict(PriorityLevel)

def __contains__(self, item: T) -> bool:
with self._lock:
return any(item in level for level in self._levels.values())

def insert(self, item, *, priority: int = 0) -> bool:
with self._lock:
self.discard(item)
return self._levels[priority].insert(item)

def discard(self, item: T) -> bool:
with self._lock:
return any(level.discard(item) for level in self._levels.values())

# etc.


In this way you never have to have a priority and then hope that self.deques[priority] and self.elem_sets[priority] gives you what you want (and they haven't gotten out of sync).



You may also want to consider not using a global lock. A readwrite lock may be more efficient (especially for non-mutating operations). Refactoring out PriorityLevel may also allow you to do per-level locking, which may be more efficient than global locking.



Also, don't check if item in self before doing a discard for example. It's just needless work. Do the discard and handle failure accordingly. Also be careful with how this interacts with locking (I see you needed a notify, which I suspect stems from this).






share|improve this answer






















  • Wow, thank you so much for this awesome review! I just already implemented the majority of your smaller suggestions: gist.github.com/Dobiasd/37705392b4aaa3a3539ba1a61efec6b6 Next I'll look into heaps and use a PriorityLevel class, mainly to avoid possible out-of-sync situations, but also to drop _clean_up and make pop O(1). Also I'll check out the readerwriterlock library. The reason why I do priorities = list(self._elem_sets.keys()) instead of directly using for priority in self._elem_sets.keys(): is RuntimeError: dictionary changed size during iteration. ;)
    – Tobias Hermann
    Aug 24 at 13:17










  • Glad I could help. And true! That makes sense. Though, that's a good reason for removing all of that complexity unless you can benchmark to prove its needed. Simple algorithms usually perform better :)
    – Bailey Parker
    Aug 24 at 13:20






  • 1




    Nice review. I'd add two little things: a link to PEP3102 since you almost name it, and the usage of any for the implementation of __contains__: return any(item in elem_set for elem_set in self.elem_set.values()).
    – Mathias Ettinger
    Aug 25 at 21:37










  • @MathiasEttinger Indeed, both of these are excellent suggestions :)
    – Bailey Parker
    Aug 25 at 21:40










Your Answer




StackExchange.ifUsing("editor", function ()
return StackExchange.using("mathjaxEditing", function ()
StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix)
StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
);
);
, "mathjax-editing");

StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "196"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
convertImagesToLinks: false,
noModals: false,
showLowRepImageUploadWarning: true,
reputationToPostImages: null,
bindNavPrevention: true,
postfix: "",
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);













 

draft saved


draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f202393%2fa-thread-safe-priority-queue-keeping-its-elements-unique%23new-answer', 'question_page');

);

Post as a guest






























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes








up vote
10
down vote



accepted










Let's start with the good:



  • Ohh recent Python 3, nice!

  • Good documentation

  • Good use of typing. This is great for datastructures.

  • Good formatting of your code. Nice spacing. Decent naming

  • Good use of defaultdict

  • Good use of asserts

Things I noticed along the way:




  • contains is usually spelled __contains__. This way you can do item in queue.

  • I'd rename contains_with_prio. I'm not sure if I would call it contains, but queue.contains(item, priority=5) reads better to me. At the least, don't abbreviate priority.

  • I'd name condition_var to just lock. with self.lock reads better this way.

  • If you name it remove for parity with how the builtin set does it, I'd have it raise KeyError if item isn't in the queue instead of returning a bool. If you want to return a bool, then name it discard (also for parity).

  • I'd do assert removed_count in (0, 1) (tuples are preferred when you don't need to modify the object)

  • Instead of empty define a __bool__ which returns True if the queue has items. This is for parity with lists. lists and other builtins are falsy when empty

  • Rename size to __len__ so you can do len(queue)

  • Rename get to pop (you even call it this in the comment) for parity with sets (and python's heap implementation)

  • For imports, I generally prefer from unittest import TestCase. Repeating the module name tends to be tedious and in many cases doesn't add much.

  • Make sure your tests are in a separate file (if the source is in foo/ordered_set_priority_queue.py you'd typically place them in test/foo/test_ordered_set_priority_queue.py; this allows you--assuming you've added all the __init__.pys--to do python3 -m unittest discover . to run all tests)


  • queue.put(0, 1) is unclear without looking at the method definition. Perhaps use a kwarg here to make it clearer: queue.put(0, priority=1)

  • Using Thread in a test is fairly fragile. Generally, testing concurrent datastructures is tricky business. This test may not always fail if there is a problem in your code unless you're very careful: https://stackoverflow.com/questions/8995491/testing-concurrent-data-structures. You're probably better off introducing some special breakpoints in your code (removed out for production) that allows you to pause your datastructure so you can specifically craft degenerate access patterns.

  • One of the key parts of a priority queue is constant time access to the top of the queue. You actually don't have that because in your get you do sorted(self.deques.keys())[-1]. This is n log n and also inefficient because it sorts the whole list (when you only need the first element). It also requires building a list in memory. Consider using python heaps to maintain a sorted heap of priorities. This way your get is always constant time.

  • Generally we prefix private attributes with _, so self._deques, self._elem_sets (call this self._item_sets, don't abbreviate), self._lock, etc.

  • There's no need to do priorities = list(self.elem_sets.keys()) in _clean_up()

  • I'd benchmark to see if _clean_up is necessary. It may not be, and eliminating it makes the logic simpler (if you take my advice and use python heap stuff)

  • For parity with Python sets rename put to insert

Here's a big detail:



The fact that you assert sorted(self.deques.keys()) == sorted(self.elem_sets.keys()) is an indication that you have a data dependency that could probably be refactored. Ideally, you'd never want that to be False, but if you separately manage deques and elem_sets then perhaps they could get out of sync. This also gives you multiple ways to get all of the priority levels (ex. self._deques.keys() and self._item_sets.keys()). You can fix this be grouping a deque and set together into a PriorityLevel.



class PriorityLevel(Generic[T]):
def __init__(self) -> None:
self._ordered_items: Deque[T] = deque()
self._items_set: Set[T] =

def __contains__(self, item: T) -> bool:
return item in self._items_set

def pop(self) -> T:
item = self._ordered_items.pop()
self._items_set.remove(item)
return item

# etc.


Then have your datastructure maintain one defaultdict of these:



class OrderedSetPriorityQueue(Generic[T]):
def __init__(self) -> None:
self._levels: DefaultDict[int, PriorityLevel[T]] = defaultdict(PriorityLevel)

def __contains__(self, item: T) -> bool:
with self._lock:
return any(item in level for level in self._levels.values())

def insert(self, item, *, priority: int = 0) -> bool:
with self._lock:
self.discard(item)
return self._levels[priority].insert(item)

def discard(self, item: T) -> bool:
with self._lock:
return any(level.discard(item) for level in self._levels.values())

# etc.


In this way you never have to have a priority and then hope that self.deques[priority] and self.elem_sets[priority] gives you what you want (and they haven't gotten out of sync).



You may also want to consider not using a global lock. A readwrite lock may be more efficient (especially for non-mutating operations). Refactoring out PriorityLevel may also allow you to do per-level locking, which may be more efficient than global locking.



Also, don't check if item in self before doing a discard for example. It's just needless work. Do the discard and handle failure accordingly. Also be careful with how this interacts with locking (I see you needed a notify, which I suspect stems from this).






share|improve this answer






















  • Wow, thank you so much for this awesome review! I just already implemented the majority of your smaller suggestions: gist.github.com/Dobiasd/37705392b4aaa3a3539ba1a61efec6b6 Next I'll look into heaps and use a PriorityLevel class, mainly to avoid possible out-of-sync situations, but also to drop _clean_up and make pop O(1). Also I'll check out the readerwriterlock library. The reason why I do priorities = list(self._elem_sets.keys()) instead of directly using for priority in self._elem_sets.keys(): is RuntimeError: dictionary changed size during iteration. ;)
    – Tobias Hermann
    Aug 24 at 13:17










  • Glad I could help. And true! That makes sense. Though, that's a good reason for removing all of that complexity unless you can benchmark to prove its needed. Simple algorithms usually perform better :)
    – Bailey Parker
    Aug 24 at 13:20






  • 1




    Nice review. I'd add two little things: a link to PEP3102 since you almost name it, and the usage of any for the implementation of __contains__: return any(item in elem_set for elem_set in self.elem_set.values()).
    – Mathias Ettinger
    Aug 25 at 21:37










  • @MathiasEttinger Indeed, both of these are excellent suggestions :)
    – Bailey Parker
    Aug 25 at 21:40














up vote
10
down vote



accepted










Let's start with the good:



  • Ohh recent Python 3, nice!

  • Good documentation

  • Good use of typing. This is great for datastructures.

  • Good formatting of your code. Nice spacing. Decent naming

  • Good use of defaultdict

  • Good use of asserts

Things I noticed along the way:




  • contains is usually spelled __contains__. This way you can do item in queue.

  • I'd rename contains_with_prio. I'm not sure if I would call it contains, but queue.contains(item, priority=5) reads better to me. At the least, don't abbreviate priority.

  • I'd name condition_var to just lock. with self.lock reads better this way.

  • If you name it remove for parity with how the builtin set does it, I'd have it raise KeyError if item isn't in the queue instead of returning a bool. If you want to return a bool, then name it discard (also for parity).

  • I'd do assert removed_count in (0, 1) (tuples are preferred when you don't need to modify the object)

  • Instead of empty define a __bool__ which returns True if the queue has items. This is for parity with lists. lists and other builtins are falsy when empty

  • Rename size to __len__ so you can do len(queue)

  • Rename get to pop (you even call it this in the comment) for parity with sets (and python's heap implementation)

  • For imports, I generally prefer from unittest import TestCase. Repeating the module name tends to be tedious and in many cases doesn't add much.

  • Make sure your tests are in a separate file (if the source is in foo/ordered_set_priority_queue.py you'd typically place them in test/foo/test_ordered_set_priority_queue.py; this allows you--assuming you've added all the __init__.pys--to do python3 -m unittest discover . to run all tests)


  • queue.put(0, 1) is unclear without looking at the method definition. Perhaps use a kwarg here to make it clearer: queue.put(0, priority=1)

  • Using Thread in a test is fairly fragile. Generally, testing concurrent datastructures is tricky business. This test may not always fail if there is a problem in your code unless you're very careful: https://stackoverflow.com/questions/8995491/testing-concurrent-data-structures. You're probably better off introducing some special breakpoints in your code (removed out for production) that allows you to pause your datastructure so you can specifically craft degenerate access patterns.

  • One of the key parts of a priority queue is constant time access to the top of the queue. You actually don't have that because in your get you do sorted(self.deques.keys())[-1]. This is n log n and also inefficient because it sorts the whole list (when you only need the first element). It also requires building a list in memory. Consider using python heaps to maintain a sorted heap of priorities. This way your get is always constant time.

  • Generally we prefix private attributes with _, so self._deques, self._elem_sets (call this self._item_sets, don't abbreviate), self._lock, etc.

  • There's no need to do priorities = list(self.elem_sets.keys()) in _clean_up()

  • I'd benchmark to see if _clean_up is necessary. It may not be, and eliminating it makes the logic simpler (if you take my advice and use python heap stuff)

  • For parity with Python sets rename put to insert

Here's a big detail:



The fact that you assert sorted(self.deques.keys()) == sorted(self.elem_sets.keys()) is an indication that you have a data dependency that could probably be refactored. Ideally, you'd never want that to be False, but if you separately manage deques and elem_sets then perhaps they could get out of sync. This also gives you multiple ways to get all of the priority levels (ex. self._deques.keys() and self._item_sets.keys()). You can fix this be grouping a deque and set together into a PriorityLevel.



class PriorityLevel(Generic[T]):
def __init__(self) -> None:
self._ordered_items: Deque[T] = deque()
self._items_set: Set[T] =

def __contains__(self, item: T) -> bool:
return item in self._items_set

def pop(self) -> T:
item = self._ordered_items.pop()
self._items_set.remove(item)
return item

# etc.


Then have your datastructure maintain one defaultdict of these:



class OrderedSetPriorityQueue(Generic[T]):
def __init__(self) -> None:
self._levels: DefaultDict[int, PriorityLevel[T]] = defaultdict(PriorityLevel)

def __contains__(self, item: T) -> bool:
with self._lock:
return any(item in level for level in self._levels.values())

def insert(self, item, *, priority: int = 0) -> bool:
with self._lock:
self.discard(item)
return self._levels[priority].insert(item)

def discard(self, item: T) -> bool:
with self._lock:
return any(level.discard(item) for level in self._levels.values())

# etc.


In this way you never have to have a priority and then hope that self.deques[priority] and self.elem_sets[priority] gives you what you want (and they haven't gotten out of sync).



You may also want to consider not using a global lock. A readwrite lock may be more efficient (especially for non-mutating operations). Refactoring out PriorityLevel may also allow you to do per-level locking, which may be more efficient than global locking.



Also, don't check if item in self before doing a discard for example. It's just needless work. Do the discard and handle failure accordingly. Also be careful with how this interacts with locking (I see you needed a notify, which I suspect stems from this).






share|improve this answer






















  • Wow, thank you so much for this awesome review! I just already implemented the majority of your smaller suggestions: gist.github.com/Dobiasd/37705392b4aaa3a3539ba1a61efec6b6 Next I'll look into heaps and use a PriorityLevel class, mainly to avoid possible out-of-sync situations, but also to drop _clean_up and make pop O(1). Also I'll check out the readerwriterlock library. The reason why I do priorities = list(self._elem_sets.keys()) instead of directly using for priority in self._elem_sets.keys(): is RuntimeError: dictionary changed size during iteration. ;)
    – Tobias Hermann
    Aug 24 at 13:17










  • Glad I could help. And true! That makes sense. Though, that's a good reason for removing all of that complexity unless you can benchmark to prove its needed. Simple algorithms usually perform better :)
    – Bailey Parker
    Aug 24 at 13:20






  • 1




    Nice review. I'd add two little things: a link to PEP3102 since you almost name it, and the usage of any for the implementation of __contains__: return any(item in elem_set for elem_set in self.elem_set.values()).
    – Mathias Ettinger
    Aug 25 at 21:37










  • @MathiasEttinger Indeed, both of these are excellent suggestions :)
    – Bailey Parker
    Aug 25 at 21:40












up vote
10
down vote



accepted







up vote
10
down vote



accepted






Let's start with the good:



  • Ohh recent Python 3, nice!

  • Good documentation

  • Good use of typing. This is great for datastructures.

  • Good formatting of your code. Nice spacing. Decent naming

  • Good use of defaultdict

  • Good use of asserts

Things I noticed along the way:




  • contains is usually spelled __contains__. This way you can do item in queue.

  • I'd rename contains_with_prio. I'm not sure if I would call it contains, but queue.contains(item, priority=5) reads better to me. At the least, don't abbreviate priority.

  • I'd name condition_var to just lock. with self.lock reads better this way.

  • If you name it remove for parity with how the builtin set does it, I'd have it raise KeyError if item isn't in the queue instead of returning a bool. If you want to return a bool, then name it discard (also for parity).

  • I'd do assert removed_count in (0, 1) (tuples are preferred when you don't need to modify the object)

  • Instead of empty define a __bool__ which returns True if the queue has items. This is for parity with lists. lists and other builtins are falsy when empty

  • Rename size to __len__ so you can do len(queue)

  • Rename get to pop (you even call it this in the comment) for parity with sets (and python's heap implementation)

  • For imports, I generally prefer from unittest import TestCase. Repeating the module name tends to be tedious and in many cases doesn't add much.

  • Make sure your tests are in a separate file (if the source is in foo/ordered_set_priority_queue.py you'd typically place them in test/foo/test_ordered_set_priority_queue.py; this allows you--assuming you've added all the __init__.pys--to do python3 -m unittest discover . to run all tests)


  • queue.put(0, 1) is unclear without looking at the method definition. Perhaps use a kwarg here to make it clearer: queue.put(0, priority=1)

  • Using Thread in a test is fairly fragile. Generally, testing concurrent datastructures is tricky business. This test may not always fail if there is a problem in your code unless you're very careful: https://stackoverflow.com/questions/8995491/testing-concurrent-data-structures. You're probably better off introducing some special breakpoints in your code (removed out for production) that allows you to pause your datastructure so you can specifically craft degenerate access patterns.

  • One of the key parts of a priority queue is constant time access to the top of the queue. You actually don't have that because in your get you do sorted(self.deques.keys())[-1]. This is n log n and also inefficient because it sorts the whole list (when you only need the first element). It also requires building a list in memory. Consider using python heaps to maintain a sorted heap of priorities. This way your get is always constant time.

  • Generally we prefix private attributes with _, so self._deques, self._elem_sets (call this self._item_sets, don't abbreviate), self._lock, etc.

  • There's no need to do priorities = list(self.elem_sets.keys()) in _clean_up()

  • I'd benchmark to see if _clean_up is necessary. It may not be, and eliminating it makes the logic simpler (if you take my advice and use python heap stuff)

  • For parity with Python sets rename put to insert

Here's a big detail:



The fact that you assert sorted(self.deques.keys()) == sorted(self.elem_sets.keys()) is an indication that you have a data dependency that could probably be refactored. Ideally, you'd never want that to be False, but if you separately manage deques and elem_sets then perhaps they could get out of sync. This also gives you multiple ways to get all of the priority levels (ex. self._deques.keys() and self._item_sets.keys()). You can fix this be grouping a deque and set together into a PriorityLevel.



class PriorityLevel(Generic[T]):
def __init__(self) -> None:
self._ordered_items: Deque[T] = deque()
self._items_set: Set[T] =

def __contains__(self, item: T) -> bool:
return item in self._items_set

def pop(self) -> T:
item = self._ordered_items.pop()
self._items_set.remove(item)
return item

# etc.


Then have your datastructure maintain one defaultdict of these:



class OrderedSetPriorityQueue(Generic[T]):
def __init__(self) -> None:
self._levels: DefaultDict[int, PriorityLevel[T]] = defaultdict(PriorityLevel)

def __contains__(self, item: T) -> bool:
with self._lock:
return any(item in level for level in self._levels.values())

def insert(self, item, *, priority: int = 0) -> bool:
with self._lock:
self.discard(item)
return self._levels[priority].insert(item)

def discard(self, item: T) -> bool:
with self._lock:
return any(level.discard(item) for level in self._levels.values())

# etc.


In this way you never have to have a priority and then hope that self.deques[priority] and self.elem_sets[priority] gives you what you want (and they haven't gotten out of sync).



You may also want to consider not using a global lock. A readwrite lock may be more efficient (especially for non-mutating operations). Refactoring out PriorityLevel may also allow you to do per-level locking, which may be more efficient than global locking.



Also, don't check if item in self before doing a discard for example. It's just needless work. Do the discard and handle failure accordingly. Also be careful with how this interacts with locking (I see you needed a notify, which I suspect stems from this).






share|improve this answer














Let's start with the good:



  • Ohh recent Python 3, nice!

  • Good documentation

  • Good use of typing. This is great for datastructures.

  • Good formatting of your code. Nice spacing. Decent naming

  • Good use of defaultdict

  • Good use of asserts

Things I noticed along the way:




  • contains is usually spelled __contains__. This way you can do item in queue.

  • I'd rename contains_with_prio. I'm not sure if I would call it contains, but queue.contains(item, priority=5) reads better to me. At the least, don't abbreviate priority.

  • I'd name condition_var to just lock. with self.lock reads better this way.

  • If you name it remove for parity with how the builtin set does it, I'd have it raise KeyError if item isn't in the queue instead of returning a bool. If you want to return a bool, then name it discard (also for parity).

  • I'd do assert removed_count in (0, 1) (tuples are preferred when you don't need to modify the object)

  • Instead of empty define a __bool__ which returns True if the queue has items. This is for parity with lists. lists and other builtins are falsy when empty

  • Rename size to __len__ so you can do len(queue)

  • Rename get to pop (you even call it this in the comment) for parity with sets (and python's heap implementation)

  • For imports, I generally prefer from unittest import TestCase. Repeating the module name tends to be tedious and in many cases doesn't add much.

  • Make sure your tests are in a separate file (if the source is in foo/ordered_set_priority_queue.py you'd typically place them in test/foo/test_ordered_set_priority_queue.py; this allows you--assuming you've added all the __init__.pys--to do python3 -m unittest discover . to run all tests)


  • queue.put(0, 1) is unclear without looking at the method definition. Perhaps use a kwarg here to make it clearer: queue.put(0, priority=1)

  • Using Thread in a test is fairly fragile. Generally, testing concurrent datastructures is tricky business. This test may not always fail if there is a problem in your code unless you're very careful: https://stackoverflow.com/questions/8995491/testing-concurrent-data-structures. You're probably better off introducing some special breakpoints in your code (removed out for production) that allows you to pause your datastructure so you can specifically craft degenerate access patterns.

  • One of the key parts of a priority queue is constant time access to the top of the queue. You actually don't have that because in your get you do sorted(self.deques.keys())[-1]. This is n log n and also inefficient because it sorts the whole list (when you only need the first element). It also requires building a list in memory. Consider using python heaps to maintain a sorted heap of priorities. This way your get is always constant time.

  • Generally we prefix private attributes with _, so self._deques, self._elem_sets (call this self._item_sets, don't abbreviate), self._lock, etc.

  • There's no need to do priorities = list(self.elem_sets.keys()) in _clean_up()

  • I'd benchmark to see if _clean_up is necessary. It may not be, and eliminating it makes the logic simpler (if you take my advice and use python heap stuff)

  • For parity with Python sets rename put to insert

Here's a big detail:



The fact that you assert sorted(self.deques.keys()) == sorted(self.elem_sets.keys()) is an indication that you have a data dependency that could probably be refactored. Ideally, you'd never want that to be False, but if you separately manage deques and elem_sets then perhaps they could get out of sync. This also gives you multiple ways to get all of the priority levels (ex. self._deques.keys() and self._item_sets.keys()). You can fix this be grouping a deque and set together into a PriorityLevel.



class PriorityLevel(Generic[T]):
def __init__(self) -> None:
self._ordered_items: Deque[T] = deque()
self._items_set: Set[T] =

def __contains__(self, item: T) -> bool:
return item in self._items_set

def pop(self) -> T:
item = self._ordered_items.pop()
self._items_set.remove(item)
return item

# etc.


Then have your datastructure maintain one defaultdict of these:



class OrderedSetPriorityQueue(Generic[T]):
def __init__(self) -> None:
self._levels: DefaultDict[int, PriorityLevel[T]] = defaultdict(PriorityLevel)

def __contains__(self, item: T) -> bool:
with self._lock:
return any(item in level for level in self._levels.values())

def insert(self, item, *, priority: int = 0) -> bool:
with self._lock:
self.discard(item)
return self._levels[priority].insert(item)

def discard(self, item: T) -> bool:
with self._lock:
return any(level.discard(item) for level in self._levels.values())

# etc.


In this way you never have to have a priority and then hope that self.deques[priority] and self.elem_sets[priority] gives you what you want (and they haven't gotten out of sync).



You may also want to consider not using a global lock. A readwrite lock may be more efficient (especially for non-mutating operations). Refactoring out PriorityLevel may also allow you to do per-level locking, which may be more efficient than global locking.



Also, don't check if item in self before doing a discard for example. It's just needless work. Do the discard and handle failure accordingly. Also be careful with how this interacts with locking (I see you needed a notify, which I suspect stems from this).







share|improve this answer














share|improve this answer



share|improve this answer








edited Aug 25 at 21:42

























answered Aug 24 at 12:46









Bailey Parker

1,426913




1,426913











  • Wow, thank you so much for this awesome review! I just already implemented the majority of your smaller suggestions: gist.github.com/Dobiasd/37705392b4aaa3a3539ba1a61efec6b6 Next I'll look into heaps and use a PriorityLevel class, mainly to avoid possible out-of-sync situations, but also to drop _clean_up and make pop O(1). Also I'll check out the readerwriterlock library. The reason why I do priorities = list(self._elem_sets.keys()) instead of directly using for priority in self._elem_sets.keys(): is RuntimeError: dictionary changed size during iteration. ;)
    – Tobias Hermann
    Aug 24 at 13:17










  • Glad I could help. And true! That makes sense. Though, that's a good reason for removing all of that complexity unless you can benchmark to prove its needed. Simple algorithms usually perform better :)
    – Bailey Parker
    Aug 24 at 13:20






  • 1




    Nice review. I'd add two little things: a link to PEP3102 since you almost name it, and the usage of any for the implementation of __contains__: return any(item in elem_set for elem_set in self.elem_set.values()).
    – Mathias Ettinger
    Aug 25 at 21:37










  • @MathiasEttinger Indeed, both of these are excellent suggestions :)
    – Bailey Parker
    Aug 25 at 21:40
















  • Wow, thank you so much for this awesome review! I just already implemented the majority of your smaller suggestions: gist.github.com/Dobiasd/37705392b4aaa3a3539ba1a61efec6b6 Next I'll look into heaps and use a PriorityLevel class, mainly to avoid possible out-of-sync situations, but also to drop _clean_up and make pop O(1). Also I'll check out the readerwriterlock library. The reason why I do priorities = list(self._elem_sets.keys()) instead of directly using for priority in self._elem_sets.keys(): is RuntimeError: dictionary changed size during iteration. ;)
    – Tobias Hermann
    Aug 24 at 13:17










  • Glad I could help. And true! That makes sense. Though, that's a good reason for removing all of that complexity unless you can benchmark to prove its needed. Simple algorithms usually perform better :)
    – Bailey Parker
    Aug 24 at 13:20






  • 1




    Nice review. I'd add two little things: a link to PEP3102 since you almost name it, and the usage of any for the implementation of __contains__: return any(item in elem_set for elem_set in self.elem_set.values()).
    – Mathias Ettinger
    Aug 25 at 21:37










  • @MathiasEttinger Indeed, both of these are excellent suggestions :)
    – Bailey Parker
    Aug 25 at 21:40















Wow, thank you so much for this awesome review! I just already implemented the majority of your smaller suggestions: gist.github.com/Dobiasd/37705392b4aaa3a3539ba1a61efec6b6 Next I'll look into heaps and use a PriorityLevel class, mainly to avoid possible out-of-sync situations, but also to drop _clean_up and make pop O(1). Also I'll check out the readerwriterlock library. The reason why I do priorities = list(self._elem_sets.keys()) instead of directly using for priority in self._elem_sets.keys(): is RuntimeError: dictionary changed size during iteration. ;)
– Tobias Hermann
Aug 24 at 13:17




Wow, thank you so much for this awesome review! I just already implemented the majority of your smaller suggestions: gist.github.com/Dobiasd/37705392b4aaa3a3539ba1a61efec6b6 Next I'll look into heaps and use a PriorityLevel class, mainly to avoid possible out-of-sync situations, but also to drop _clean_up and make pop O(1). Also I'll check out the readerwriterlock library. The reason why I do priorities = list(self._elem_sets.keys()) instead of directly using for priority in self._elem_sets.keys(): is RuntimeError: dictionary changed size during iteration. ;)
– Tobias Hermann
Aug 24 at 13:17












Glad I could help. And true! That makes sense. Though, that's a good reason for removing all of that complexity unless you can benchmark to prove its needed. Simple algorithms usually perform better :)
– Bailey Parker
Aug 24 at 13:20




Glad I could help. And true! That makes sense. Though, that's a good reason for removing all of that complexity unless you can benchmark to prove its needed. Simple algorithms usually perform better :)
– Bailey Parker
Aug 24 at 13:20




1




1




Nice review. I'd add two little things: a link to PEP3102 since you almost name it, and the usage of any for the implementation of __contains__: return any(item in elem_set for elem_set in self.elem_set.values()).
– Mathias Ettinger
Aug 25 at 21:37




Nice review. I'd add two little things: a link to PEP3102 since you almost name it, and the usage of any for the implementation of __contains__: return any(item in elem_set for elem_set in self.elem_set.values()).
– Mathias Ettinger
Aug 25 at 21:37












@MathiasEttinger Indeed, both of these are excellent suggestions :)
– Bailey Parker
Aug 25 at 21:40




@MathiasEttinger Indeed, both of these are excellent suggestions :)
– Bailey Parker
Aug 25 at 21:40

















 

draft saved


draft discarded















































 


draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f202393%2fa-thread-safe-priority-queue-keeping-its-elements-unique%23new-answer', 'question_page');

);

Post as a guest













































































Comments

Popular posts from this blog

What does second last employer means? [closed]

List of Gilmore Girls characters

Confectionery