(self, task_id)
| 183 | self.f and self.f.flush() |
| 184 | |
| 185 | def handle_task_start(self, task_id): |
| 186 | log_path = get_celery_task_log_path(task_id) |
| 187 | self.f = open(log_path, 'a') |
| 188 | |
| 189 | def handle_task_end(self, task_id): |
| 190 | self.f and self.f.close() |
nothing calls this directly
no test coverage detected