MCPcopy
hub / github.com/smittix/intercept / flush

Method flush

utils/morse.py:692–716  ·  view source on GitHub ↗

Flush pending symbols at end-of-stream.

(self)

Source from the content-addressed store, hash-verified

690 return events
691
692 def flush(self) -> list[dict[str, Any]]:
693 """Flush pending symbols at end-of-stream."""
694 events: list[dict[str, Any]] = []
695
696 if self._tone_on and (self._tone_blocks + self._dropout_blocks) >= self._dit_min:
697 tone_count = self._tone_blocks + self._dropout_blocks
698 element = '-' if tone_count >= self._dah_threshold else '.'
699 self._current_symbol += element
700 events.append({
701 'type': 'morse_element',
702 'element': element,
703 'duration_ms': round(tone_count * self._block_duration * 1000.0, 1),
704 })
705
706 if self._current_symbol:
707 decoded = self._decode_symbol(self._current_symbol, datetime.now().strftime('%H:%M:%S'))
708 if decoded is not None:
709 events.append(decoded)
710 self._current_symbol = ''
711
712 self._tone_on = False
713 self._tone_blocks = 0.0
714 self._silence_blocks = 0.0
715 self._dropout_blocks = 0.0
716 return events
717
718
719def _wav_to_mono_float(path: Path) -> tuple[np.ndarray, int]:

Calls 1

_decode_symbolMethod · 0.95