This is not the same as a JoinableQueue. Here, instead of waiting for all the work to be processed, the wait is for work to be available.
| 7 | |
| 8 | |
| 9 | class NotifyingQueue(Event, Generic[T]): |
| 10 | """This is not the same as a JoinableQueue. Here, instead of waiting for |
| 11 | all the work to be processed, the wait is for work to be available. |
| 12 | """ |
| 13 | |
| 14 | def __init__(self, maxsize: int = None, items: Iterable[T] = ()) -> None: |
| 15 | super().__init__() |
| 16 | self.queue = Queue(maxsize, items) |
| 17 | |
| 18 | if items: |
| 19 | self.set() |
| 20 | |
| 21 | def put(self, item: T) -> None: |
| 22 | """Add new item to the queue.""" |
| 23 | self.queue.put(item) |
| 24 | self.set() |
| 25 | |
| 26 | def get(self, block: bool = True, timeout: float = None) -> T: |
| 27 | """Removes and returns an item from the queue.""" |
| 28 | value = self.queue.get(block, timeout) |
| 29 | if self.queue.empty(): |
| 30 | self.clear() |
| 31 | return value |
| 32 | |
| 33 | def peek(self, block: bool = True, timeout: float = None) -> T: |
| 34 | return self.queue.peek(block, timeout) |
| 35 | |
| 36 | def __len__(self) -> int: |
| 37 | return len(self.queue) |
| 38 | |
| 39 | def copy(self) -> List[T]: |
| 40 | """Copies the current queue items.""" |
| 41 | copy = self.queue.copy() |
| 42 | |
| 43 | result = [] |
| 44 | while not copy.empty(): |
| 45 | result.append(copy.get_nowait()) |
| 46 | return result |
| 47 | |
| 48 | def __repr__(self) -> str: |
| 49 | return f"NotifyingQueue(id={id(self)}, num_items={len(self.queue)})" |
no outgoing calls