MCPcopy
hub / github.com/gorakhargosh/watchdog / get

Method get

src/watchdog/utils/delayed_queue.py:39–66  ·  view source on GitHub ↗

Remove and return an element from the queue, or this queue has been closed raise the Closed exception.

(self)

Source from the content-addressed store, hash-verified

37 self._not_empty.release()
38
39 def get(self) -> T | None:
40 """Remove and return an element from the queue, or this queue has been
41 closed raise the Closed exception.
42 """
43 while True:
44 # wait for element to be added to queue
45 self._not_empty.acquire()
46 while len(self._queue) == 0 and not self._closed:
47 self._not_empty.wait()
48
49 if self._closed:
50 self._not_empty.release()
51 return None
52 head, insert_time, delay = self._queue[0]
53 self._not_empty.release()
54
55 # wait for delay if required
56 if delay:
57 time_left = insert_time + self.delay_sec - time.time()
58 while time_left > 0:
59 time.sleep(time_left)
60 time_left = insert_time + self.delay_sec - time.time()
61
62 # return element if it's still in the queue
63 with self._lock:
64 if len(self._queue) > 0 and self._queue[0][0] is head:
65 self._queue.popleft()
66 return head
67
68 def remove(self, predicate: Callable[[T], bool]) -> T | None:
69 """Remove and return the first items for which predicate is True,

Callers 15

pathMethod · 0.45
test_delayed_getFunction · 0.45
test_nondelayed_getFunction · 0.45
expect_eventMethod · 0.45
test_basic_queueFunction · 0.45
test_prevent_consecutiveFunction · 0.45
test_createFunction · 0.45
test_closedFunction · 0.45

Calls

no outgoing calls

Tested by 15

test_delayed_getFunction · 0.36
test_nondelayed_getFunction · 0.36
test_basic_queueFunction · 0.36
test_prevent_consecutiveFunction · 0.36
test_createFunction · 0.36
test_closedFunction · 0.36
test_modifyFunction · 0.36