(self)
| 192 | return True |
| 193 | |
| 194 | def validate_gcp_connection(self): |
| 195 | account_path = self.gcp_credentials_path |
| 196 | bucket_name = self.bucket_name |
| 197 | test_file_path = 'diffgram_test_file.txt' |
| 198 | client = None |
| 199 | bucket = None |
| 200 | bcolors.printcolor('Testing Connection...', bcolors.OKBLUE) |
| 201 | try: |
| 202 | file = open(account_path, mode = 'r') |
| 203 | credentials = service_account.Credentials.from_service_account_file(account_path) |
| 204 | client = storage.Client(credentials = credentials) |
| 205 | bucket = client.get_bucket(bucket_name) |
| 206 | print(f"{bcolors.OKGREEN}[OK] {bcolors.ENDC}Connection To GCP Account") |
| 207 | except Exception as e: |
| 208 | print(f"{bcolors.FAIL}[ERROR] {bcolors.ENDC}Connection To GCP Account") |
| 209 | bcolors.printcolor('Error Connecting to GCP: Please check you entered valid credentials.', bcolors.FAIL) |
| 210 | print(f"Details: {traceback.format_exc()}") |
| 211 | bcolors.printcolor('Please update credentials and try again', bcolors.OKBLUE) |
| 212 | return False |
| 213 | time.sleep(0.5) |
| 214 | try: |
| 215 | blob = bucket.blob(test_file_path) |
| 216 | blob.upload_from_string('This is a diffgram test file', content_type = 'text/plain') |
| 217 | print(f"{bcolors.OKGREEN}[OK] {bcolors.ENDC}Write Permissions") |
| 218 | except: |
| 219 | print(f"{bcolors.FAIL}[ERROR] {bcolors.ENDC}Write Permissions") |
| 220 | bcolors.printcolor('Error Connecting to GCP: Please check you have write permissions on the GCP bucket.', |
| 221 | bcolors.FAIL) |
| 222 | print(f"Details: {traceback.format_exc()}") |
| 223 | bcolors.printcolor('Please update permissions and try again', bcolors.OKBLUE) |
| 224 | return False |
| 225 | time.sleep(0.5) |
| 226 | try: |
| 227 | expiration_offset = 40368000 |
| 228 | expiration_time = int(time.time() + expiration_offset) |
| 229 | bucket.blob(test_file_path) |
| 230 | |
| 231 | filename = test_file_path.split("/")[-1] |
| 232 | url_signed = blob.generate_signed_url( |
| 233 | expiration = expiration_time, |
| 234 | response_disposition = f"attachment; filename={filename}" |
| 235 | ) |
| 236 | resp = requests.get(url_signed) |
| 237 | if resp.status_code != 200: |
| 238 | raise Exception( |
| 239 | f"Error when accessing presigned URL: Status({resp.status_code}). Error: {resp.text}") |
| 240 | |
| 241 | print(f"{bcolors.OKGREEN}[OK] {bcolors.ENDC}Read Permissions") |
| 242 | except: |
| 243 | print(f"{bcolors.FAIL}[ERROR] {bcolors.ENDC}Read Permissions") |
| 244 | bcolors.printcolor('Error Connecting to GCP: Please check you have read permissions on the GCP bucket.', |
| 245 | bcolors.FAIL) |
| 246 | print(f"Details: {traceback.format_exc()}") |
| 247 | bcolors.printcolor('Please update permissions and try again', bcolors.OKBLUE) |
| 248 | return False |
| 249 | time.sleep(0.5) |
| 250 | bcolors.printcolor('Connection to GCP Successful!', bcolors.OKGREEN) |
| 251 | return True |
no test coverage detected