Generic function to process cleanup tasks. Args: queryset_func: Function that returns the queryset to process transform_func: Function to transform each record for MongoDB model: Django model class task_name: Name of the task for logging collection_n
(
queryset_func: Callable,
transform_func: Callable[[Dict], Dict],
model,
task_name: str,
collection_name: str,
)
| 90 | |
| 91 | |
| 92 | def process_cleanup_task( |
| 93 | queryset_func: Callable, |
| 94 | transform_func: Callable[[Dict], Dict], |
| 95 | model, |
| 96 | task_name: str, |
| 97 | collection_name: str, |
| 98 | ): |
| 99 | """ |
| 100 | Generic function to process cleanup tasks. |
| 101 | |
| 102 | Args: |
| 103 | queryset_func: Function that returns the queryset to process |
| 104 | transform_func: Function to transform each record for MongoDB |
| 105 | model: Django model class |
| 106 | task_name: Name of the task for logging |
| 107 | collection_name: MongoDB collection name |
| 108 | """ |
| 109 | logger.info(f"Starting {task_name} cleanup task") |
| 110 | |
| 111 | # Get MongoDB collection |
| 112 | mongo_collection = get_mongo_collection(collection_name) |
| 113 | mongo_available = mongo_collection is not None |
| 114 | |
| 115 | # Get queryset |
| 116 | queryset = queryset_func() |
| 117 | |
| 118 | # Process records in batches |
| 119 | buffer: List[Dict[str, Any]] = [] |
| 120 | ids_to_delete: List[int] = [] |
| 121 | total_processed = 0 |
| 122 | total_batches = 0 |
| 123 | |
| 124 | for record in queryset: |
| 125 | # Transform record for MongoDB |
| 126 | buffer.append(transform_func(record)) |
| 127 | ids_to_delete.append(record["id"]) |
| 128 | |
| 129 | # Flush batch when it reaches BATCH_SIZE |
| 130 | if len(buffer) >= BATCH_SIZE: |
| 131 | total_batches += 1 |
| 132 | flush_to_mongo_and_delete( |
| 133 | mongo_collection=mongo_collection, |
| 134 | buffer=buffer, |
| 135 | ids_to_delete=ids_to_delete, |
| 136 | model=model, |
| 137 | mongo_available=mongo_available, |
| 138 | ) |
| 139 | total_processed += len(buffer) |
| 140 | buffer.clear() |
| 141 | ids_to_delete.clear() |
| 142 | |
| 143 | # Process final batch if any records remain |
| 144 | if buffer: |
| 145 | total_batches += 1 |
| 146 | flush_to_mongo_and_delete( |
| 147 | mongo_collection=mongo_collection, |
| 148 | buffer=buffer, |
| 149 | ids_to_delete=ids_to_delete, |
no test coverage detected