Send a best-effort notification on the standalone stream. Never raises. If there's no standalone channel or the stream is broken, the notification is dropped and debug-logged.
(self, method: str, params: Mapping[str, Any] | None, opts: CallOptions | None = None)
| 276 | return cls.model_validate(raw, by_name=False) |
| 277 | |
| 278 | async def notify(self, method: str, params: Mapping[str, Any] | None, opts: CallOptions | None = None) -> None: |
| 279 | """Send a best-effort notification on the standalone stream. |
| 280 | |
| 281 | Never raises. If there's no standalone channel or the stream is broken, |
| 282 | the notification is dropped and debug-logged. |
| 283 | """ |
| 284 | try: |
| 285 | await self.outbound.notify(method, params, opts) |
| 286 | except (anyio.BrokenResourceError, anyio.ClosedResourceError): |
| 287 | logger.debug("dropped %s: standalone stream closed", method) |
| 288 | |
| 289 | async def ping(self, *, meta: Meta | None = None, opts: CallOptions | None = None) -> None: |
| 290 | """Send a `ping` request on the standalone stream. |
no test coverage detected