MCPcopy
hub / github.com/diffgram/diffgram / validate_gcp_connection

Method validate_gcp_connection

install.py:194–251  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

192 return True
193
194 def validate_gcp_connection(self):
195 account_path = self.gcp_credentials_path
196 bucket_name = self.bucket_name
197 test_file_path = 'diffgram_test_file.txt'
198 client = None
199 bucket = None
200 bcolors.printcolor('Testing Connection...', bcolors.OKBLUE)
201 try:
202 file = open(account_path, mode = 'r')
203 credentials = service_account.Credentials.from_service_account_file(account_path)
204 client = storage.Client(credentials = credentials)
205 bucket = client.get_bucket(bucket_name)
206 print(f"{bcolors.OKGREEN}[OK] {bcolors.ENDC}Connection To GCP Account")
207 except Exception as e:
208 print(f"{bcolors.FAIL}[ERROR] {bcolors.ENDC}Connection To GCP Account")
209 bcolors.printcolor('Error Connecting to GCP: Please check you entered valid credentials.', bcolors.FAIL)
210 print(f"Details: {traceback.format_exc()}")
211 bcolors.printcolor('Please update credentials and try again', bcolors.OKBLUE)
212 return False
213 time.sleep(0.5)
214 try:
215 blob = bucket.blob(test_file_path)
216 blob.upload_from_string('This is a diffgram test file', content_type = 'text/plain')
217 print(f"{bcolors.OKGREEN}[OK] {bcolors.ENDC}Write Permissions")
218 except:
219 print(f"{bcolors.FAIL}[ERROR] {bcolors.ENDC}Write Permissions")
220 bcolors.printcolor('Error Connecting to GCP: Please check you have write permissions on the GCP bucket.',
221 bcolors.FAIL)
222 print(f"Details: {traceback.format_exc()}")
223 bcolors.printcolor('Please update permissions and try again', bcolors.OKBLUE)
224 return False
225 time.sleep(0.5)
226 try:
227 expiration_offset = 40368000
228 expiration_time = int(time.time() + expiration_offset)
229 bucket.blob(test_file_path)
230
231 filename = test_file_path.split("/")[-1]
232 url_signed = blob.generate_signed_url(
233 expiration = expiration_time,
234 response_disposition = f"attachment; filename={filename}"
235 )
236 resp = requests.get(url_signed)
237 if resp.status_code != 200:
238 raise Exception(
239 f"Error when accessing presigned URL: Status({resp.status_code}). Error: {resp.text}")
240
241 print(f"{bcolors.OKGREEN}[OK] {bcolors.ENDC}Read Permissions")
242 except:
243 print(f"{bcolors.FAIL}[ERROR] {bcolors.ENDC}Read Permissions")
244 bcolors.printcolor('Error Connecting to GCP: Please check you have read permissions on the GCP bucket.',
245 bcolors.FAIL)
246 print(f"Details: {traceback.format_exc()}")
247 bcolors.printcolor('Please update permissions and try again', bcolors.OKBLUE)
248 return False
249 time.sleep(0.5)
250 bcolors.printcolor('Connection to GCP Successful!', bcolors.OKGREEN)
251 return True

Callers 1

installMethod · 0.95

Calls 4

printcolorMethod · 0.80
splitMethod · 0.80
upload_from_stringMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected