Given an event_source dictionary, create the object and add the event source.
(
event_source: Dict[str, Any], lambda_arn: str, target_function: str, boto_session: boto3.Session, dry: bool = False
)
| 738 | |
| 739 | |
| 740 | def add_event_source( |
| 741 | event_source: Dict[str, Any], lambda_arn: str, target_function: str, boto_session: boto3.Session, dry: bool = False |
| 742 | ) -> str: |
| 743 | """ |
| 744 | Given an event_source dictionary, create the object and add the event source. |
| 745 | """ |
| 746 | event_source_obj, function_arn = get_event_source(event_source, lambda_arn, target_function, boto_session, dry=False) |
| 747 | # TODO: Detect changes in config and refine exists algorithm |
| 748 | if not dry: |
| 749 | if not event_source_obj.status(function_arn): |
| 750 | event_source_obj.add(function_arn) |
| 751 | return "successful" if event_source_obj.status(function_arn) else "failed" |
| 752 | else: |
| 753 | return "exists" |
| 754 | |
| 755 | return "dryrun" |
| 756 | |
| 757 | |
| 758 | def remove_event_source( |