(task_id: str, progress: float, message: Union[str, None] = None)
| 39 | |
| 40 | |
| 41 | def set_progress(task_id: str, progress: float, message: Union[str, None] = None): |
| 42 | if not redis_client: |
| 43 | return # Skip if connection failed |
| 44 | if not task_id: |
| 45 | return # skip if not run as API |
| 46 | key = get_progress_key(task_id) |
| 47 | |
| 48 | # Retrieve the existing status if message is None |
| 49 | if message is None: |
| 50 | try: |
| 51 | raw_data = redis_client.get(key) |
| 52 | if raw_data: |
| 53 | existing_status = json.loads(raw_data) |
| 54 | message = existing_status.get("message", "initial message") |
| 55 | else: |
| 56 | message = "initial message" |
| 57 | except Exception as e: |
| 58 | print( |
| 59 | f"ERROR: Failed to retrieve existing message from Redis for {task_id}: {e}" |
| 60 | ) |
| 61 | message = "initial message" |
| 62 | |
| 63 | status = {"progress": progress, "message": message} |
| 64 | try: |
| 65 | redis_client.set(key, json.dumps(status), ex=DEFAULT_EXPIRY_SECONDS) |
| 66 | print(f"Task {task_id}: {progress*100:.2f}% complete - {message}") |
| 67 | except Exception as e: |
| 68 | print(f"ERROR: Failed to set progress in Redis for {task_id}: {e}") |
| 69 | |
| 70 | |
| 71 | def get_progress( |
no test coverage detected