(self, credential_process)
| 1093 | ) |
| 1094 | |
| 1095 | def _retrieve_credentials_using(self, credential_process): |
| 1096 | # We're not using shell=True, so we need to pass the |
| 1097 | # command and all arguments as a list. |
| 1098 | process_list = compat_shell_split(credential_process) |
| 1099 | with original_ld_library_path(): |
| 1100 | p = self._popen( |
| 1101 | process_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE |
| 1102 | ) |
| 1103 | stdout, stderr = p.communicate() |
| 1104 | if p.returncode != 0: |
| 1105 | raise CredentialRetrievalError( |
| 1106 | provider=self.METHOD, error_msg=stderr.decode('utf-8') |
| 1107 | ) |
| 1108 | parsed = botocore.compat.json.loads(stdout.decode('utf-8')) |
| 1109 | version = parsed.get('Version', '<Version key not provided>') |
| 1110 | if version != 1: |
| 1111 | raise CredentialRetrievalError( |
| 1112 | provider=self.METHOD, |
| 1113 | error_msg=( |
| 1114 | f"Unsupported version '{version}' for credential process " |
| 1115 | "provider, supported versions: 1" |
| 1116 | ), |
| 1117 | ) |
| 1118 | try: |
| 1119 | return { |
| 1120 | 'access_key': parsed['AccessKeyId'], |
| 1121 | 'secret_key': parsed['SecretAccessKey'], |
| 1122 | 'token': parsed.get('SessionToken'), |
| 1123 | 'expiry_time': parsed.get('Expiration'), |
| 1124 | 'account_id': self._get_account_id(parsed), |
| 1125 | } |
| 1126 | except KeyError as e: |
| 1127 | raise CredentialRetrievalError( |
| 1128 | provider=self.METHOD, |
| 1129 | error_msg=f"Missing required key in response: {e}", |
| 1130 | ) |
| 1131 | |
| 1132 | @property |
| 1133 | def _credential_process(self): |
no test coverage detected