| 30 | from gui.tray import TaskbarPanel |
| 31 | |
| 32 | class P2PManager(WSInterface): |
| 33 | def __init__(self, config: Config, is_login_phase=True): |
| 34 | self.config = config |
| 35 | self.clipboard_manager = ClipboardManager(self.config) |
| 36 | self.cipher_manager = CipherManager(self.config) |
| 37 | self.notification_manager = NotificationManager(self.config) |
| 38 | self.sys_tray: TaskbarPanel = None |
| 39 | self.first_conn_lost = True |
| 40 | self.is_login_phase = is_login_phase |
| 41 | self.ws_client = None |
| 42 | self.is_connected = False |
| 43 | self.disconnected = False |
| 44 | self.is_auto_reconnecting = False |
| 45 | self._suppress_auto_reconnect_once = False |
| 46 | self.is_clipboard_monitoring_on = False |
| 47 | |
| 48 | # Fragment variables |
| 49 | self.sending_fragment_id = "" # The id of the fragment currently being sent |
| 50 | self.receiving_fragments: dict = {} # Mapping: fragmentid:str -> fragment:list[str] |
| 51 | self.sending_fragment_stats: str = None |
| 52 | self.receiving_fragment_stats: str = None |
| 53 | |
| 54 | # p2p variables |
| 55 | self.my_peer_id: str = None # Own peer id assigned by the server |
| 56 | self.peers: set[str] = set() # All peers in this 'room' |
| 57 | self.peer_connections: dict[str, RTCPeerConnection] = ( |
| 58 | {} |
| 59 | ) # Mapping: peer_id -> RTCPeerConnection |
| 60 | self.data_channels: dict[str, RTCDataChannel] = {} # Mapping: peer_id -> DataChannel |
| 61 | self.live_connections: int = 0 # Number of open data channels (derived; see _sync) |
| 62 | self._live_connections_lock = Lock() |
| 63 | # If PEER_LIST arrives before ASSIGNED_ID (e.g. right after signaling reconnect), mesh setup waits. |
| 64 | self._pending_peer_list: Optional[List[str]] = None |
| 65 | |
| 66 | # True during full P2P teardown (logout / signaling reconnect prep); suppresses ICE recovery. |
| 67 | self._p2p_shutting_down: bool = False |
| 68 | # Serialize recover + offer handling per peer (same asyncio loop as signaling handlers). |
| 69 | self._peer_recovery_locks: Dict[str, asyncio.Lock] = {} |
| 70 | self._dc_heartbeat_handle: Optional[asyncio.TimerHandle] = None |
| 71 | |
| 72 | # Event loop for asyncio |
| 73 | self.loop = asyncio.new_event_loop() |
| 74 | self.loop_thread = Thread( |
| 75 | target=self.loop.run_forever, name="P2PManagerEventLoopThread", daemon=True |
| 76 | ) |
| 77 | self.loop_thread.start() |
| 78 | |
| 79 | def schedule_task(self, coro): |
| 80 | """Submit an async function to the manager's event loop thread.""" |
| 81 | return asyncio.run_coroutine_threadsafe(coro, self.loop) |
| 82 | |
| 83 | def _cancel_dc_heartbeat(self) -> None: |
| 84 | if self._dc_heartbeat_handle is not None: |
| 85 | self._dc_heartbeat_handle.cancel() |
| 86 | self._dc_heartbeat_handle = None |
| 87 | |
| 88 | def _dc_heartbeat_tick(self) -> None: |
| 89 | self._dc_heartbeat_handle = None |