MCPcopy Index your code
hub / github.com/zappa/Zappa / get_event_source

Function get_event_source

zappa/utilities.py:710–737  ·  view source on GitHub ↗

Given an event_source dictionary item, a session and a lambda_arn, create and return the appropriate event source object.

(
    event_source: Dict[str, Any], lambda_arn: str, target_function: str, boto_session: boto3.Session, dry: bool = False
)

Source from the content-addressed store, hash-verified

708
709
710def get_event_source(
711 event_source: Dict[str, Any], lambda_arn: str, target_function: str, boto_session: boto3.Session, dry: bool = False
712) -> Tuple[BaseEventSource, str]:
713 """
714 Given an event_source dictionary item, a session and a lambda_arn,
715 create and return the appropriate event source object.
716 """
717 event_source_map: Dict[str, type[BaseEventSource]] = {
718 "dynamodb": DynamoDBStreamEventSource,
719 "kinesis": KinesisEventSource,
720 "s3": S3EventSource,
721 "sns": SNSEventSource,
722 "sqs": SqsEventSource,
723 "events": CloudWatchEventSource,
724 }
725
726 arn = event_source["arn"]
727 _, _, svc, _ = arn.split(":", 3)
728
729 event_source_class = event_source_map.get(svc, None)
730 if not event_source_class:
731 raise ValueError("Unknown event source: {0}".format(arn))
732
733 function_arn = lambda_arn
734
735 event_source_obj = event_source_class(boto_session, event_source)
736
737 return event_source_obj, function_arn
738
739
740def add_event_source(

Callers 3

add_event_sourceFunction · 0.85
remove_event_sourceFunction · 0.85
get_event_source_statusFunction · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected