MCPcopy
hub / github.com/diffgram/diffgram / validate_s3_connection

Method validate_s3_connection

install.py:253–316  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

251 return True
252
253 def validate_s3_connection(self):
254 if not self.use_docker_minio or self.static_storage_provider == 'aws':
255 endpoint_url = self.s3_endpoint_url
256 else:
257 endpoint_url = 'http://localhost:9000'
258 access_id = self.s3_access_id
259 access_secret = self.s3_access_secret
260 bucket_name = self.bucket_name
261 bucket_region = self.bucket_region
262 is_aws_signature_v4 = self.is_aws_signature_v4
263 test_file_path = 'diffgram_test_file.txt'
264 client = None
265 bcolors.printcolor('Testing Connection...', bcolors.OKBLUE)
266 try:
267 if is_aws_signature_v4:
268 client = boto3.client('s3', config=Config(signature_version='s3v4'), endpoint_url=endpoint_url,
269 aws_access_key_id=access_id, aws_secret_access_key=access_secret, region_name=bucket_region)
270 else:
271 client = boto3.client('s3', endpoint_url=endpoint_url, aws_access_key_id=access_id, aws_secret_access_key=access_secret, region_name=bucket_region)
272 print(f"{bcolors.OKGREEN}[OK] {bcolors.ENDC}Connection To Storage")
273 except Exception as e:
274 print(f"{bcolors.FAIL}[ERROR] {bcolors.ENDC}Connection To Storage")
275 bcolors.printcolor('Error Connecting to S3: Please check you entered valid credentials.', bcolors.FAIL)
276 print(f"Details: {traceback.format_exc()}")
277 bcolors.printcolor('Please update credentials and try again', bcolors.OKBLUE)
278 return False
279 time.sleep(0.5)
280 try:
281 client.put_object(Body = 'This is a diffgram test file',
282 Bucket = bucket_name,
283 Key = test_file_path,
284 ContentType = 'text/plain')
285 print(f"{bcolors.OKGREEN}[OK] {bcolors.ENDC}Write Permissions")
286 except:
287 print(f"{bcolors.FAIL}[ERROR] {bcolors.ENDC}Write Permissions")
288 bcolors.printcolor('Error Connecting to storage bucket: Please check you have write permissions on the bucket.',
289 bcolors.FAIL)
290 print(f"Details: {traceback.format_exc()}")
291 bcolors.printcolor('Please update permissions and try again', bcolors.OKBLUE)
292 return False
293 time.sleep(0.5)
294 try:
295 signed_url = client.generate_presigned_url('get_object',
296 Params = {'Bucket': bucket_name, 'Key': test_file_path},
297 ExpiresIn = 3600 * 24 * 6)
298 resp = requests.get(signed_url)
299 if resp.status_code != 200:
300 raise Exception(
301 f"Error when accessing presigned URL: Status({resp.status_code}). Error: {resp.text}")
302
303 print(f"{bcolors.OKGREEN}[OK] {bcolors.ENDC}Read Permissions")
304 except:
305 print(f"{bcolors.WARNING}[ERROR] {bcolors.ENDC}Read Permissions")
306 bcolors.printcolor('Error Connecting to S3: Please check you have read permissions on the S3 bucket.',
307 bcolors.WARNING)
308 print(f"Details: {traceback.format_exc()}")
309 bcolors.printcolor('Please update permissions and try again', bcolors.OKBLUE)
310 return False

Callers 1

installMethod · 0.95

Calls 2

printcolorMethod · 0.80
getMethod · 0.45

Tested by

no test coverage detected