MCPcopy Index your code
hub / github.com/zappa/Zappa / add_event_source

Function add_event_source

zappa/utilities.py:740–755  ·  view source on GitHub ↗

Given an event_source dictionary, create the object and add the event source.

(
    event_source: Dict[str, Any], lambda_arn: str, target_function: str, boto_session: boto3.Session, dry: bool = False
)

Source from the content-addressed store, hash-verified

738
739
740def add_event_source(
741 event_source: Dict[str, Any], lambda_arn: str, target_function: str, boto_session: boto3.Session, dry: bool = False
742) -> str:
743 """
744 Given an event_source dictionary, create the object and add the event source.
745 """
746 event_source_obj, function_arn = get_event_source(event_source, lambda_arn, target_function, boto_session, dry=False)
747 # TODO: Detect changes in config and refine exists algorithm
748 if not dry:
749 if not event_source_obj.status(function_arn):
750 event_source_obj.add(function_arn)
751 return "successful" if event_source_obj.status(function_arn) else "failed"
752 else:
753 return "exists"
754
755 return "dryrun"
756
757
758def remove_event_source(

Callers 3

test_add_event_sourceMethod · 0.90
schedule_eventsMethod · 0.85

Calls 3

get_event_sourceFunction · 0.85
statusMethod · 0.45
addMethod · 0.45

Tested by 1

test_add_event_sourceMethod · 0.72