nolint:nestif
( ctx context.Context, e *events.Event[*asyncprocessing.ExecuteAsyncTaskPayload], task *types.Task, processingErr error, )
| 340 | |
| 341 | //nolint:nestif |
| 342 | func (s *Service) finalStatusUpdate( |
| 343 | ctx context.Context, |
| 344 | e *events.Event[*asyncprocessing.ExecuteAsyncTaskPayload], |
| 345 | task *types.Task, |
| 346 | processingErr error, |
| 347 | ) (bool, error) { |
| 348 | var runAgain bool |
| 349 | err := s.tx.WithTx( |
| 350 | ctx, func(ctx context.Context) error { |
| 351 | _, err := s.taskRepository.LockForUpdate(ctx, task) |
| 352 | if err != nil { |
| 353 | log.Ctx(ctx).Error().Msgf("failed to lock task [%s] for update: %v", task.Key, err) |
| 354 | return fmt.Errorf("failed to lock task for update: %w", err) |
| 355 | } |
| 356 | if processingErr != nil { |
| 357 | log.Error().Ctx(ctx).Msgf("processing error for task [%s]: %v", task.Key, processingErr) |
| 358 | err = s.taskSourceRepository.UpdateSourceStatus(ctx, e.ID, types.TaskStatusFailure, processingErr.Error()) |
| 359 | if err != nil { |
| 360 | return err |
| 361 | } |
| 362 | runAgain, err = s.taskRepository.CompleteTask(ctx, task.Key, types.TaskStatusFailure) |
| 363 | if err != nil { |
| 364 | return err |
| 365 | } |
| 366 | } else { |
| 367 | err = s.taskSourceRepository.UpdateSourceStatus(ctx, e.ID, types.TaskStatusSuccess, "") |
| 368 | if err != nil { |
| 369 | return err |
| 370 | } |
| 371 | runAgain, err = s.taskRepository.CompleteTask(ctx, task.Key, types.TaskStatusSuccess) |
| 372 | if err != nil { |
| 373 | return err |
| 374 | } |
| 375 | } |
| 376 | return err |
| 377 | }) |
| 378 | if err != nil { |
| 379 | return false, fmt.Errorf("failed to update final statuses of task and sources, eventID:%s, task key: %s, err: %w", |
| 380 | e.ID, task.Key, err) |
| 381 | } |
| 382 | return runAgain, nil |
| 383 | } |
| 384 | |
| 385 | func (s *Service) ProcessingStatusUpdate(ctx context.Context, task *types.Task, runID string) error { |
| 386 | err := s.tx.WithTx( |
no test coverage detected