| 66 | |
| 67 | |
| 68 | class _SyncSignal(Generic[P], _SignalMixin): |
| 69 | def connect(self, receiver: Callable[P, None]) -> None: |
| 70 | assert not inspect.iscoroutinefunction(receiver) |
| 71 | super().connect(receiver) |
| 72 | |
| 73 | def disconnect(self, receiver: Callable[P, None]) -> None: |
| 74 | super().disconnect(receiver) |
| 75 | |
| 76 | def send(self, *args: P.args, **kwargs: P.kwargs) -> None: |
| 77 | for ret in super().notify(*args, **kwargs): |
| 78 | assert ret is None or not inspect.isawaitable(ret) |
| 79 | |
| 80 | |
| 81 | class _AsyncSignal(Generic[P], _SignalMixin): |
no outgoing calls
no test coverage detected
searching dependent graphs…