MCPcopy
hub / github.com/weaviate/weaviate / assembleStatusFromMetadata

Method assembleStatusFromMetadata

usecases/export/scheduler.go:529–599  ·  view source on GitHub ↗

assembleStatusFromMetadata reads per-node status files from the configured backup backend and assembles overall status using the metadata's NodeAssignments. The returned allTerminal flag is true when every node has written a status file with a terminal status (Success or Failed), distinguishing genu

(
	ctx context.Context,
	backend modulecapabilities.BackupBackend,
	id, bucket, path string,
	meta *ExportMetadata,
)

Source from the content-addressed store, hash-verified

527// written a status file with a terminal status (Success or Failed),
528// distinguishing genuine completions from liveness-inferred failures.
529func (s *Scheduler) assembleStatusFromMetadata(
530 ctx context.Context,
531 backend modulecapabilities.BackupBackend,
532 id, bucket, path string,
533 meta *ExportMetadata,
534) (_ *models.ExportStatusResponse, allTerminal bool, _ error) {
535 homePath := backend.HomeDir(id, bucket, path)
536
537 // Read per-node statuses and apply liveness overrides for non-terminal nodes.
538 // The status file is read up to twice per node. The export goroutine writes
539 // the terminal status and then clears its activeExport flag. An IsRunning
540 // check that lands between those two events sees "not running" while the
541 // status file still shows Transferring/Started. A single re-read is
542 // sufficient because stopWriter blocks until the write completes before
543 // clearAndRelease clears activeExport, so the file is guaranteed to be on
544 // disk when IsRunning returns false.
545 const maxStatusReads = 2
546 nodeStatuses := make(map[string]*NodeStatus, len(meta.NodeAssignments))
547 for nodeName := range meta.NodeAssignments {
548 var nodeStatus *NodeStatus
549 for attempt := range maxStatusReads {
550 ns, err := readNodeStatus(ctx, backend, id, bucket, path, nodeName)
551 if err != nil {
552 if !errors.As(err, &backup.ErrNotFound{}) {
553 return nil, false, fmt.Errorf("get status for node %s: %w", nodeName, err)
554 }
555 ns = &NodeStatus{
556 NodeName: nodeName,
557 Status: export.Transferring,
558 ShardProgress: make(map[string]map[string]*ShardProgress),
559 }
560 }
561
562 // For non-terminal nodes, verify liveness and override to Failed if unreachable.
563 switch ns.Status {
564 case export.Success, export.Failed:
565 // Terminal — no liveness check needed.
566 nodeStatus = ns
567 default:
568 host, alive := s.nodeResolver.NodeHostname(nodeName)
569 if !alive {
570 ns.Status = export.Failed
571 if ns.Error == "" {
572 ns.Error = fmt.Sprintf("node %s is no longer part of the cluster", nodeName)
573 }
574 nodeStatus = ns
575 } else {
576 running, runErr := s.client.IsRunning(ctx, host, id)
577 if runErr != nil || !running {
578 if attempt < maxStatusReads-1 {
579 // Re-read the status file — the goroutine may
580 // have written the terminal status by now.
581 continue
582 }
583 ns.Status = export.Failed
584 if ns.Error == "" {
585 ns.Error = fmt.Sprintf("node %s is no longer running export %s", nodeName, id)
586 }

Calls 6

readNodeStatusFunction · 0.85
assembleNodeStatusesFunction · 0.85
ErrorfMethod · 0.80
HomeDirMethod · 0.65
NodeHostnameMethod · 0.65
IsRunningMethod · 0.65