Given an event_source dictionary item, a session and a lambda_arn, create and return the appropriate event source object.
(
event_source: Dict[str, Any], lambda_arn: str, target_function: str, boto_session: boto3.Session, dry: bool = False
)
| 708 | |
| 709 | |
| 710 | def get_event_source( |
| 711 | event_source: Dict[str, Any], lambda_arn: str, target_function: str, boto_session: boto3.Session, dry: bool = False |
| 712 | ) -> Tuple[BaseEventSource, str]: |
| 713 | """ |
| 714 | Given an event_source dictionary item, a session and a lambda_arn, |
| 715 | create and return the appropriate event source object. |
| 716 | """ |
| 717 | event_source_map: Dict[str, type[BaseEventSource]] = { |
| 718 | "dynamodb": DynamoDBStreamEventSource, |
| 719 | "kinesis": KinesisEventSource, |
| 720 | "s3": S3EventSource, |
| 721 | "sns": SNSEventSource, |
| 722 | "sqs": SqsEventSource, |
| 723 | "events": CloudWatchEventSource, |
| 724 | } |
| 725 | |
| 726 | arn = event_source["arn"] |
| 727 | _, _, svc, _ = arn.split(":", 3) |
| 728 | |
| 729 | event_source_class = event_source_map.get(svc, None) |
| 730 | if not event_source_class: |
| 731 | raise ValueError("Unknown event source: {0}".format(arn)) |
| 732 | |
| 733 | function_arn = lambda_arn |
| 734 | |
| 735 | event_source_obj = event_source_class(boto_session, event_source) |
| 736 | |
| 737 | return event_source_obj, function_arn |
| 738 | |
| 739 | |
| 740 | def add_event_source( |
no outgoing calls
no test coverage detected