MCPcopy Index your code
hub / github.com/alievk/avatarify-python / VideoCaptureAsync

Class VideoCaptureAsync

afy/videocaptureasync.py:11–72  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

9
10
11class VideoCaptureAsync:
12 def __init__(self, src=0, width=640, height=480):
13 self.src = src
14
15 self.cap = cv2.VideoCapture(self.src)
16 if not self.cap.isOpened():
17 raise RuntimeError("Cannot open camera")
18
19 self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
20 self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
21 self.grabbed, self.frame = self.cap.read()
22 self.started = False
23 self.read_lock = threading.Lock()
24
25 def set(self, var1, var2):
26 self.cap.set(var1, var2)
27
28 def isOpened(self):
29 return self.cap.isOpened()
30
31 def start(self):
32 if self.started:
33 print('[!] Asynchronous video capturing has already been started.')
34 return None
35 self.started = True
36 self.thread = threading.Thread(target=self.update, args=(), daemon=True)
37 self.thread.start()
38
39 # (warmup) wait for the first successfully grabbed frame
40 warmup_start_time = time.time()
41 while not self.grabbed:
42 warmup_elapsed_time = (time.time() - warmup_start_time)
43 if warmup_elapsed_time > WARMUP_TIMEOUT:
44 raise RuntimeError(f"Failed to succesfully grab frame from the camera (timeout={WARMUP_TIMEOUT}s). Try to restart.")
45
46 time.sleep(0.5)
47
48 return self
49
50 def update(self):
51 while self.started:
52 grabbed, frame = self.cap.read()
53 if not grabbed or frame is None or frame.size == 0:
54 continue
55 with self.read_lock:
56 self.grabbed = grabbed
57 self.frame = frame
58
59 def read(self):
60 while True:
61 with self.read_lock:
62 frame = self.frame.copy()
63 grabbed = self.grabbed
64 break
65 return grabbed, frame
66
67 def stop(self):
68 self.started = False

Callers 1

cam_fomm.pyFile · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected