MCPcopy
hub / github.com/raiden-network/raiden / ProxyManager

Class ProxyManager

raiden/network/proxies/proxy_manager.py:63–302  ·  view source on GitHub ↗

Encapsulates access and creation of contract proxies. This class keeps track of mapping between contract addresses and their internal contract proxy counterparts. It also synchronizes creation of proxies, so that a 1-to-1 relationship is kept.

Source from the content-addressed store, hash-verified

61
62
63class ProxyManager:
64 """Encapsulates access and creation of contract proxies.
65
66 This class keeps track of mapping between contract addresses and their internal
67 contract proxy counterparts. It also synchronizes creation of proxies, so that
68 a 1-to-1 relationship is kept.
69 """
70
71 # pylint: disable=too-many-instance-attributes
72
73 def __init__(
74 self,
75 rpc_client: JSONRPCClient,
76 contract_manager: ContractManager,
77 metadata: ProxyManagerMetadata,
78 ) -> None:
79 self.address_to_secret_registry: Dict[SecretRegistryAddress, SecretRegistry] = {}
80 self.address_to_token: Dict[TokenAddress, Token] = {}
81 self.address_to_custom_token: Dict[TokenAddress, CustomToken] = {}
82 self.address_to_token_network: Dict[TokenNetworkAddress, TokenNetwork] = {}
83 self.address_to_token_network_registry: Dict[
84 TokenNetworkRegistryAddress, TokenNetworkRegistry
85 ] = {}
86 self.address_to_user_deposit: Dict[UserDepositAddress, UserDeposit] = {}
87 self.address_to_service_registry: Dict[ServiceRegistryAddress, ServiceRegistry] = {}
88 self.address_to_monitoring_service: Dict[MonitoringServiceAddress, MonitoringService] = {}
89 self.address_to_one_to_n: Dict[OneToNAddress, OneToN] = {}
90 self.identifier_to_payment_channel: Dict[
91 Tuple[TokenNetworkAddress, ChannelID], PaymentChannel
92 ] = {}
93
94 self.client = rpc_client
95 self.contract_manager = contract_manager
96 self.metadata = metadata
97
98 # exposing the lock since it is needed for a proper gas_reserve
99 # estimation
100 self.token_network_creation_lock = Semaphore()
101
102 self._token_creation_lock = Semaphore()
103 self._token_network_registry_creation_lock = Semaphore()
104 self._secret_registry_creation_lock = Semaphore()
105 self._service_registry_creation_lock = Semaphore()
106 self._payment_channel_creation_lock = Semaphore()
107 self._user_deposit_creation_lock = Semaphore()
108 self._monitoring_service_creation_lock = Semaphore()
109 self._one_to_n_creation_lock = Semaphore()
110
111 def token(self, token_address: TokenAddress, block_identifier: BlockIdentifier) -> Token:
112 """Return a proxy to interact with a token."""
113 if not is_binary_address(token_address):
114 raise ValueError("token_address must be a valid address")
115
116 with self._token_creation_lock:
117 if token_address not in self.address_to_token:
118 self.address_to_token[token_address] = Token(
119 jsonrpc_client=self.client,
120 token_address=token_address,

Calls

no outgoing calls