MCPcopy Index your code
hub / github.com/Miserlou/Zappa / get_function_for_aws_event

Method get_function_for_aws_event

zappa/handler.py:288–317  ·  view source on GitHub ↗

Get the associated function to execute for a triggered AWS event Support S3, SNS, DynamoDB, kinesis and SQS events

(self, record)

Source from the content-addressed store, hash-verified

286 return result
287
288 def get_function_for_aws_event(self, record):
289 """
290 Get the associated function to execute for a triggered AWS event
291
292 Support S3, SNS, DynamoDB, kinesis and SQS events
293 """
294 if 's3' in record:
295 if ':' in record['s3']['configurationId']:
296 return record['s3']['configurationId'].split(':')[-1]
297
298 arn = None
299 if 'Sns' in record:
300 try:
301 message = json.loads(record['Sns']['Message'])
302 if message.get('command'):
303 return message['command']
304 except ValueError:
305 pass
306 arn = record['Sns'].get('TopicArn')
307 elif 'dynamodb' in record or 'kinesis' in record:
308 arn = record.get('eventSourceARN')
309 elif 'eventSource' in record and record.get('eventSource') == 'aws:sqs':
310 arn = record.get('eventSourceARN')
311 elif 's3' in record:
312 arn = record['s3']['bucket']['arn']
313
314 if arn:
315 return self.settings.AWS_EVENT_MAPPING.get(arn)
316
317 return None
318
319 def get_function_from_bot_intent_trigger(self, event):
320 """

Callers 1

handlerMethod · 0.95

Calls

no outgoing calls

Tested by

no test coverage detected