MCPcopy Index your code
hub / github.com/zappa/Zappa / invoke

Method invoke

zappa/cli.py:1551–1597  ·  view source on GitHub ↗

Invoke a remote function.

(
        self, function_name, raw_python=False, command=None, no_color=False, client_context=None, qualifier=None, payload=None
    )

Source from the content-addressed store, hash-verified

1549 click.echo("SNS Topic removed: %s" % ", ".join(removed_arns))
1550
1551 def invoke(
1552 self, function_name, raw_python=False, command=None, no_color=False, client_context=None, qualifier=None, payload=None
1553 ):
1554 """
1555 Invoke a remote function.
1556 """
1557
1558 # There are three likely scenarios for 'command' here:
1559 # command, which is a modular function path
1560 # raw_command, which is a string of python to execute directly
1561 # manage, which is a Django-specific management command invocation
1562 key = command if command is not None else "command"
1563 if raw_python:
1564 command = {"raw_command": function_name}
1565 else:
1566 command = {key: function_name}
1567
1568 if payload:
1569 import json as json_module
1570
1571 try:
1572 payload_dict = json_module.loads(payload)
1573 except (ValueError, TypeError) as e:
1574 raise ClickException("--payload must be valid JSON: {}".format(e))
1575 if not isinstance(payload_dict, dict):
1576 raise ClickException("--payload must be a JSON object (dict), not {}.".format(type(payload_dict).__name__))
1577 command.update(payload_dict)
1578
1579 client_context = base64.b64encode(client_context.encode("utf-8")).decode("utf-8") if client_context else None
1580
1581 # Can't use hjson
1582 import json as json
1583
1584 response = self.zappa.invoke_lambda_function(
1585 self.lambda_name,
1586 json.dumps(command),
1587 invocation_type="RequestResponse",
1588 client_context=client_context,
1589 qualifier=qualifier,
1590 )
1591
1592 print(self.format_lambda_response(response, not no_color))
1593
1594 # For a successful request FunctionError is not in response.
1595 # https://github.com/Miserlou/Zappa/pull/1254/
1596 if "FunctionError" in response:
1597 raise ClickException("{} error occurred while invoking command.".format(response["FunctionError"]))
1598
1599 def format_lambda_response(self, response, colorize=True):
1600 if "LogResult" in response:

Calls 3

updateMethod · 0.45