MCPcopy
hub / github.com/diffgram/diffgram / Data_tools

Class Data_tools

shared/data_tools_core.py:18–66  ·  view source on GitHub ↗

Factory Class For Data_tools Class implementation. Depending on the setting set in settings.DIFFGRAM_STATIC_STORAGE_PROVIDER this class will instantiatiate a different cloud provider implementation for data tools which will manager the upload, download of blobs from

Source from the content-addressed store, hash-verified

16
17
18class Data_tools(metaclass = Singleton):
19 """
20 Factory Class For Data_tools Class implementation.
21 Depending on the setting set in settings.DIFFGRAM_STATIC_STORAGE_PROVIDER
22 this class will instantiatiate a different cloud provider implementation for data tools
23 which will manager the upload, download of blobs from the cloud storage provider.
24 """
25
26 def __init__(self):
27 provider = settings.DIFFGRAM_STATIC_STORAGE_PROVIDER
28
29 if not provider:
30 raise ValueError("No DIFFGRAM_STATIC_STORAGE_PROVIDER env var set. valid values are [gcp, aws, azure]")
31
32 if provider == 'gcp':
33 self.data_tools = DataToolsGCP()
34 elif provider == 'aws':
35 self.data_tools = DataToolsS3()
36 elif provider == 'minio':
37 self.data_tools = DataToolsMinio()
38 elif provider == 'azure':
39 self.data_tools = DataToolsAzure()
40
41 def determine_if_should_regenerate_url(self,
42 blob_object: DiffgramBlobObjectType,
43 session: Session,
44 url_signed_expiry: int = None):
45 if not session: return False, None
46
47 return True, settings.SIGNED_URL_CACHE_NEW_OFFSET_SECONDS_VALID
48 minimum_secs_valid = settings.SIGNED_URL_CACHE_MINIMUM_SECONDS_VALID
49 new_offset_in_seconds = settings.SIGNED_URL_CACHE_NEW_OFFSET_SECONDS_VALID
50 time_to_check = time.time() + minimum_secs_valid
51
52 if url_signed_expiry is None:
53 url_signed_expiry = blob_object.url_signed_expiry
54
55
56 if url_signed_expiry is None:
57 return True, new_offset_in_seconds
58
59 url_signed_expiry = int(float(blob_object.url_signed_expiry))
60 if url_signed_expiry <= settings.URL_SIGNED_REFRESH + (new_offset_in_seconds):
61 return True, new_offset_in_seconds
62
63 if url_signed_expiry <= time_to_check:
64 return True, new_offset_in_seconds
65
66 return False, None
67
68
69data_tools_instance = Data_tools()

Callers 15

main.pyFile · 0.90
set_logo.pyFile · 0.90
get_system_logo.pyFile · 0.90
images_core.pyFile · 0.90
append_to_batch.pyFile · 0.90
overlayMethod · 0.90
__init__Method · 0.90
upload_video_fileMethod · 0.90
process_media.pyFile · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected