Resolve context header mappings from the API Gateway requestContext and add them to the headers dict (mutates in place).
(
event_info: Dict[str, Any],
headers: Dict[str, Any],
context_header_mappings: Optional[Dict[str, str]],
)
| 910 | |
| 911 | |
| 912 | def resolve_context_headers( |
| 913 | event_info: Dict[str, Any], |
| 914 | headers: Dict[str, Any], |
| 915 | context_header_mappings: Optional[Dict[str, str]], |
| 916 | ) -> None: |
| 917 | """ |
| 918 | Resolve context header mappings from the API Gateway requestContext |
| 919 | and add them to the headers dict (mutates in place). |
| 920 | """ |
| 921 | if not context_header_mappings: |
| 922 | return |
| 923 | request_context = event_info.get("requestContext") |
| 924 | if not request_context: |
| 925 | return |
| 926 | for key, value in context_header_mappings.items(): |
| 927 | parts = value.split(".") |
| 928 | header_val: Any = request_context |
| 929 | for part in parts: |
| 930 | if part not in header_val: |
| 931 | header_val = None |
| 932 | break |
| 933 | header_val = header_val[part] |
| 934 | if header_val is not None: |
| 935 | headers[key] = header_val |
| 936 | |
| 937 | |
| 938 | def merge_headers(event: Dict[str, Any]) -> Dict[str, str]: |
no outgoing calls
no test coverage detected