(self, *args, **kwargs)
| 475 | def decorator(f): |
| 476 | @functools.wraps(f) |
| 477 | def do_rpc(self, *args, **kwargs): |
| 478 | sig = inspect.signature(f) |
| 479 | bound_args = sig.bind(self, *args, **kwargs) |
| 480 | named = {} # Arguments for the remote process |
| 481 | extra = {} # Arguments for the local process |
| 482 | for name, param in sig.parameters.items(): |
| 483 | if name == "self": |
| 484 | continue |
| 485 | if name in bound_args.arguments: |
| 486 | if name == "wait": |
| 487 | extra[name] = bound_args.arguments[name] |
| 488 | else: |
| 489 | named[name] = bound_args.arguments[name] |
| 490 | else: |
| 491 | if param.default is not param.empty: |
| 492 | named[name] = param.default |
| 493 | |
| 494 | if self.server_version < since: |
| 495 | raise self.RPCServerOutdated(f.__name__, format_version(since)) |
| 496 | |
| 497 | for name, restriction in kwargs_decorator.items(): |
| 498 | if restriction["since"] <= self.server_version: |
| 499 | continue |
| 500 | if "previously" in restriction and named[name] == restriction["previously"]: |
| 501 | continue |
| 502 | if restriction.get("dontcare", False): |
| 503 | continue |
| 504 | |
| 505 | raise self.RPCServerOutdated( |
| 506 | f"{f.__name__} {name}={named[name]!s}", format_version(restriction["since"]) |
| 507 | ) |
| 508 | |
| 509 | return self.call(f.__name__, named, **extra) |
| 510 | |
| 511 | return do_rpc |
| 512 |
nothing calls this directly
no test coverage detected