MCPcopy Index your code
hub / github.com/zappa/Zappa / get_function_for_aws_event

Method get_function_for_aws_event

zappa/handler.py:634–663  ·  view source on GitHub ↗

Get the associated function to execute for a triggered AWS event Support S3, SNS, DynamoDB, kinesis and SQS events

(self, record)

Source from the content-addressed store, hash-verified

632 return result
633
634 def get_function_for_aws_event(self, record):
635 """
636 Get the associated function to execute for a triggered AWS event
637
638 Support S3, SNS, DynamoDB, kinesis and SQS events
639 """
640 if "s3" in record:
641 if ":" in record["s3"]["configurationId"]:
642 return record["s3"]["configurationId"].split(":")[-1]
643
644 arn = None
645 if "Sns" in record:
646 try:
647 message = json.loads(record["Sns"]["Message"])
648 if message.get("command"):
649 return message["command"]
650 except ValueError:
651 pass
652 arn = record["Sns"].get("TopicArn")
653 elif "dynamodb" in record or "kinesis" in record:
654 arn = record.get("eventSourceARN")
655 elif "eventSource" in record and record.get("eventSource") == "aws:sqs":
656 arn = record.get("eventSourceARN")
657 elif "s3" in record:
658 arn = record["s3"]["bucket"]["arn"]
659
660 if arn:
661 return self.settings.AWS_EVENT_MAPPING.get(arn)
662
663 return None
664
665 def get_function_from_bot_intent_trigger(self, event):
666 """

Callers 1

handlerMethod · 0.95

Calls

no outgoing calls

Tested by

no test coverage detected