MCPcopy
hub / github.com/ag2ai/faststream / get_broker

Method get_broker

tests/brokers/base/basic.py:14–19  ·  view source on GitHub ↗
(
        self,
        apply_types: bool = False,
        **kwargs: Any,
    )

Source from the content-addressed store, hash-verified

12
13 @abstractmethod
14 def get_broker(
15 self,
16 apply_types: bool = False,
17 **kwargs: Any,
18 ) -> BrokerUsecase[Any, Any]:
19 raise NotImplementedError
20
21 def patch_broker(
22 self,

Calls

no outgoing calls