| 485 | LOG.exception("Unable to add S3 event source") |
| 486 | |
| 487 | def remove(self, function_arn: str) -> bool: |
| 488 | try: |
| 489 | # Remove Lambda permission |
| 490 | try: |
| 491 | self._lambda.remove_permission(FunctionName=function_arn, StatementId=f"s3-{self.bucket_name}") |
| 492 | except botocore.exceptions.ClientError: |
| 493 | pass |
| 494 | |
| 495 | # Remove bucket notification |
| 496 | response = self._s3.get_bucket_notification_configuration(Bucket=self.bucket_name) |
| 497 | config = response |
| 498 | lambda_configs = config.get("LambdaFunctionConfigurations", []) |
| 499 | notification_id = self._make_notification_id(function_arn) |
| 500 | lambda_configs = [c for c in lambda_configs if c.get("Id") != notification_id] |
| 501 | config["LambdaFunctionConfigurations"] = lambda_configs |
| 502 | config.pop("ResponseMetadata", None) |
| 503 | self._s3.put_bucket_notification_configuration(Bucket=self.bucket_name, NotificationConfiguration=config) |
| 504 | LOG.debug("Removed S3 event source") |
| 505 | return True |
| 506 | except Exception: |
| 507 | LOG.exception("Unable to remove S3 event source") |
| 508 | return False |
| 509 | |
| 510 | def status(self, function_arn: str) -> Optional[Dict[str, Any]]: |
| 511 | try: |