(self, function_arn: str)
| 437 | return function_arn.split(":")[-1] |
| 438 | |
| 439 | def add(self, function_arn: str) -> None: |
| 440 | # Add Lambda permission |
| 441 | try: |
| 442 | self._lambda.add_permission( |
| 443 | FunctionName=function_arn, |
| 444 | StatementId=f"s3-{self.bucket_name}", |
| 445 | Action="lambda:InvokeFunction", |
| 446 | Principal="s3.amazonaws.com", |
| 447 | SourceArn=self.arn, |
| 448 | ) |
| 449 | except botocore.exceptions.ClientError as e: |
| 450 | if e.response["Error"]["Code"] != "ResourceConflictException": |
| 451 | LOG.exception("Unable to add Lambda permission for S3") |
| 452 | |
| 453 | # Configure bucket notification |
| 454 | try: |
| 455 | # Get existing configuration |
| 456 | try: |
| 457 | response = self._s3.get_bucket_notification_configuration(Bucket=self.bucket_name) |
| 458 | config = response |
| 459 | except botocore.exceptions.ClientError: |
| 460 | config = {} |
| 461 | |
| 462 | # Add new configuration |
| 463 | lambda_configs = config.get("LambdaFunctionConfigurations", []) |
| 464 | new_config: Dict[str, Any] = { |
| 465 | "Id": self._make_notification_id(function_arn), |
| 466 | "LambdaFunctionArn": function_arn, |
| 467 | "Events": self.events, |
| 468 | } |
| 469 | if self.prefix: |
| 470 | new_config["Filter"] = {"Key": {"FilterRules": [{"Name": "prefix", "Value": self.prefix}]}} |
| 471 | if self.suffix: |
| 472 | if "Filter" not in new_config: |
| 473 | new_config["Filter"] = {"Key": {"FilterRules": []}} |
| 474 | new_config["Filter"]["Key"]["FilterRules"].append({"Name": "suffix", "Value": self.suffix}) |
| 475 | |
| 476 | lambda_configs.append(new_config) |
| 477 | config["LambdaFunctionConfigurations"] = lambda_configs |
| 478 | |
| 479 | # Remove ResponseMetadata if present |
| 480 | config.pop("ResponseMetadata", None) |
| 481 | |
| 482 | self._s3.put_bucket_notification_configuration(Bucket=self.bucket_name, NotificationConfiguration=config) |
| 483 | LOG.debug("Added S3 event source") |
| 484 | except Exception: |
| 485 | LOG.exception("Unable to add S3 event source") |
| 486 | |
| 487 | def remove(self, function_arn: str) -> bool: |
| 488 | try: |
no test coverage detected