MCPcopy Index your code
hub / github.com/raiden-network/raiden / NotifyingQueue

Class NotifyingQueue

raiden/utils/notifying_queue.py:9–49  ·  view source on GitHub ↗

This is not the same as a JoinableQueue. Here, instead of waiting for all the work to be processed, the wait is for work to be available.

Source from the content-addressed store, hash-verified

7
8
9class NotifyingQueue(Event, Generic[T]):
10 """This is not the same as a JoinableQueue. Here, instead of waiting for
11 all the work to be processed, the wait is for work to be available.
12 """
13
14 def __init__(self, maxsize: int = None, items: Iterable[T] = ()) -> None:
15 super().__init__()
16 self.queue = Queue(maxsize, items)
17
18 if items:
19 self.set()
20
21 def put(self, item: T) -> None:
22 """Add new item to the queue."""
23 self.queue.put(item)
24 self.set()
25
26 def get(self, block: bool = True, timeout: float = None) -> T:
27 """Removes and returns an item from the queue."""
28 value = self.queue.get(block, timeout)
29 if self.queue.empty():
30 self.clear()
31 return value
32
33 def peek(self, block: bool = True, timeout: float = None) -> T:
34 return self.queue.peek(block, timeout)
35
36 def __len__(self) -> int:
37 return len(self.queue)
38
39 def copy(self) -> List[T]:
40 """Copies the current queue items."""
41 copy = self.queue.copy()
42
43 result = []
44 while not copy.empty():
45 result.append(copy.get_nowait())
46 return result
47
48 def __repr__(self) -> str:
49 return f"NotifyingQueue(id={id(self)}, num_items={len(self.queue)})"

Callers 3

__init__Method · 0.90
test_queueFunction · 0.90
test_not_emptyFunction · 0.90

Calls

no outgoing calls

Tested by 2

test_queueFunction · 0.72
test_not_emptyFunction · 0.72