(blob_object: DiffgramBlobObjectType,
session: Session,
connection_id: int = None,
bucket_name: str = None,
access_token: str = None,
reference_file: File = None)
| 276 | |
| 277 | |
| 278 | def blob_regenerate_url(blob_object: DiffgramBlobObjectType, |
| 279 | session: Session, |
| 280 | connection_id: int = None, |
| 281 | bucket_name: str = None, |
| 282 | access_token: str = None, |
| 283 | reference_file: File = None) -> list[object, dict]: |
| 284 | |
| 285 | if not blob_object.url_signed_blob_path: |
| 286 | return |
| 287 | |
| 288 | strategy = determine_url_regenerate_strategy( |
| 289 | connection_id = connection_id, |
| 290 | bucket_name = bucket_name) |
| 291 | |
| 292 | should_regenerate, new_offset_in_seconds = data_tools.determine_if_should_regenerate_url(blob_object, session) |
| 293 | if should_regenerate is not True and strategy != 'connection': |
| 294 | return |
| 295 | |
| 296 | logger.debug(f"Regenerating with {strategy} strategy") |
| 297 | |
| 298 | if strategy == "default": |
| 299 | blob_object, log = default_url_regenerate( |
| 300 | session = session, |
| 301 | blob_object = blob_object, |
| 302 | new_offset_in_seconds = new_offset_in_seconds |
| 303 | ) |
| 304 | |
| 305 | if strategy == "connection": |
| 306 | logger.debug(f'Generate Signed Url with connection {connection_id} on bucket {bucket_name}') |
| 307 | blob_object, log = connection_url_regenerate( |
| 308 | session = session, |
| 309 | blob_object = blob_object, |
| 310 | connection_id = connection_id, |
| 311 | bucket_name = bucket_name, |
| 312 | new_offset_in_seconds = new_offset_in_seconds, |
| 313 | reference_file = reference_file, |
| 314 | access_token = access_token |
| 315 | ) |
| 316 | |
| 317 | if regular_log.log_has_error(log): |
| 318 | logger.error(f'Failed to regenerate Blob URL {log}') |
| 319 | blob_object.url_signed = None |
| 320 | blob_object.error = str(log) |
| 321 | session.add(blob_object) |
| 322 | return blob_object, log |
| 323 | |
| 324 | return blob_object, log |
| 325 | |
| 326 | |
| 327 | def determine_url_regenerate_strategy(connection_id, |
no test coverage detected