assembleStatusFromMetadata reads per-node status files from the configured backup backend and assembles overall status using the metadata's NodeAssignments. The returned allTerminal flag is true when every node has written a status file with a terminal status (Success or Failed), distinguishing genu
( ctx context.Context, backend modulecapabilities.BackupBackend, id, bucket, path string, meta *ExportMetadata, )
| 527 | // written a status file with a terminal status (Success or Failed), |
| 528 | // distinguishing genuine completions from liveness-inferred failures. |
| 529 | func (s *Scheduler) assembleStatusFromMetadata( |
| 530 | ctx context.Context, |
| 531 | backend modulecapabilities.BackupBackend, |
| 532 | id, bucket, path string, |
| 533 | meta *ExportMetadata, |
| 534 | ) (_ *models.ExportStatusResponse, allTerminal bool, _ error) { |
| 535 | homePath := backend.HomeDir(id, bucket, path) |
| 536 | |
| 537 | // Read per-node statuses and apply liveness overrides for non-terminal nodes. |
| 538 | // The status file is read up to twice per node. The export goroutine writes |
| 539 | // the terminal status and then clears its activeExport flag. An IsRunning |
| 540 | // check that lands between those two events sees "not running" while the |
| 541 | // status file still shows Transferring/Started. A single re-read is |
| 542 | // sufficient because stopWriter blocks until the write completes before |
| 543 | // clearAndRelease clears activeExport, so the file is guaranteed to be on |
| 544 | // disk when IsRunning returns false. |
| 545 | const maxStatusReads = 2 |
| 546 | nodeStatuses := make(map[string]*NodeStatus, len(meta.NodeAssignments)) |
| 547 | for nodeName := range meta.NodeAssignments { |
| 548 | var nodeStatus *NodeStatus |
| 549 | for attempt := range maxStatusReads { |
| 550 | ns, err := readNodeStatus(ctx, backend, id, bucket, path, nodeName) |
| 551 | if err != nil { |
| 552 | if !errors.As(err, &backup.ErrNotFound{}) { |
| 553 | return nil, false, fmt.Errorf("get status for node %s: %w", nodeName, err) |
| 554 | } |
| 555 | ns = &NodeStatus{ |
| 556 | NodeName: nodeName, |
| 557 | Status: export.Transferring, |
| 558 | ShardProgress: make(map[string]map[string]*ShardProgress), |
| 559 | } |
| 560 | } |
| 561 | |
| 562 | // For non-terminal nodes, verify liveness and override to Failed if unreachable. |
| 563 | switch ns.Status { |
| 564 | case export.Success, export.Failed: |
| 565 | // Terminal — no liveness check needed. |
| 566 | nodeStatus = ns |
| 567 | default: |
| 568 | host, alive := s.nodeResolver.NodeHostname(nodeName) |
| 569 | if !alive { |
| 570 | ns.Status = export.Failed |
| 571 | if ns.Error == "" { |
| 572 | ns.Error = fmt.Sprintf("node %s is no longer part of the cluster", nodeName) |
| 573 | } |
| 574 | nodeStatus = ns |
| 575 | } else { |
| 576 | running, runErr := s.client.IsRunning(ctx, host, id) |
| 577 | if runErr != nil || !running { |
| 578 | if attempt < maxStatusReads-1 { |
| 579 | // Re-read the status file — the goroutine may |
| 580 | // have written the terminal status by now. |
| 581 | continue |
| 582 | } |
| 583 | ns.Status = export.Failed |
| 584 | if ns.Error == "" { |
| 585 | ns.Error = fmt.Sprintf("node %s is no longer running export %s", nodeName, id) |
| 586 | } |