Factory Class For Data_tools Class implementation. Depending on the setting set in settings.DIFFGRAM_STATIC_STORAGE_PROVIDER this class will instantiatiate a different cloud provider implementation for data tools which will manager the upload, download of blobs from
| 16 | |
| 17 | |
| 18 | class Data_tools(metaclass = Singleton): |
| 19 | """ |
| 20 | Factory Class For Data_tools Class implementation. |
| 21 | Depending on the setting set in settings.DIFFGRAM_STATIC_STORAGE_PROVIDER |
| 22 | this class will instantiatiate a different cloud provider implementation for data tools |
| 23 | which will manager the upload, download of blobs from the cloud storage provider. |
| 24 | """ |
| 25 | |
| 26 | def __init__(self): |
| 27 | provider = settings.DIFFGRAM_STATIC_STORAGE_PROVIDER |
| 28 | |
| 29 | if not provider: |
| 30 | raise ValueError("No DIFFGRAM_STATIC_STORAGE_PROVIDER env var set. valid values are [gcp, aws, azure]") |
| 31 | |
| 32 | if provider == 'gcp': |
| 33 | self.data_tools = DataToolsGCP() |
| 34 | elif provider == 'aws': |
| 35 | self.data_tools = DataToolsS3() |
| 36 | elif provider == 'minio': |
| 37 | self.data_tools = DataToolsMinio() |
| 38 | elif provider == 'azure': |
| 39 | self.data_tools = DataToolsAzure() |
| 40 | |
| 41 | def determine_if_should_regenerate_url(self, |
| 42 | blob_object: DiffgramBlobObjectType, |
| 43 | session: Session, |
| 44 | url_signed_expiry: int = None): |
| 45 | if not session: return False, None |
| 46 | |
| 47 | return True, settings.SIGNED_URL_CACHE_NEW_OFFSET_SECONDS_VALID |
| 48 | minimum_secs_valid = settings.SIGNED_URL_CACHE_MINIMUM_SECONDS_VALID |
| 49 | new_offset_in_seconds = settings.SIGNED_URL_CACHE_NEW_OFFSET_SECONDS_VALID |
| 50 | time_to_check = time.time() + minimum_secs_valid |
| 51 | |
| 52 | if url_signed_expiry is None: |
| 53 | url_signed_expiry = blob_object.url_signed_expiry |
| 54 | |
| 55 | |
| 56 | if url_signed_expiry is None: |
| 57 | return True, new_offset_in_seconds |
| 58 | |
| 59 | url_signed_expiry = int(float(blob_object.url_signed_expiry)) |
| 60 | if url_signed_expiry <= settings.URL_SIGNED_REFRESH + (new_offset_in_seconds): |
| 61 | return True, new_offset_in_seconds |
| 62 | |
| 63 | if url_signed_expiry <= time_to_check: |
| 64 | return True, new_offset_in_seconds |
| 65 | |
| 66 | return False, None |
| 67 | |
| 68 | |
| 69 | data_tools_instance = Data_tools() |
no outgoing calls
no test coverage detected