MCPcopy
hub / github.com/weaviate/weaviate / writeExportMetadata

Function writeExportMetadata

usecases/export/scheduler.go:821–865  ·  view source on GitHub ↗

writeExportMetadata writes the given ExportMetadata to the storage backend with retries. It uses a fresh context with a timeout so the write succeeds even if the original context was canceled.

(
	backend modulecapabilities.BackupBackend,
	exportID, bucket, path string,
	metadata *ExportMetadata,
	logger logrus.FieldLogger,
)

Source from the content-addressed store, hash-verified

819// with retries. It uses a fresh context with a timeout so the write succeeds
820// even if the original context was canceled.
821func writeExportMetadata(
822 backend modulecapabilities.BackupBackend,
823 exportID, bucket, path string,
824 metadata *ExportMetadata,
825 logger logrus.FieldLogger,
826) error {
827 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
828 defer cancel()
829
830 // Ensure CompletedAt is always set for terminal metadata so the file
831 // has a meaningful timestamp even when no node reported a completion
832 // time (e.g. all nodes crashed or the export was canceled/shut down).
833 if metadata.CompletedAt.IsZero() {
834 switch metadata.Status {
835 case export.Success, export.Failed, export.Canceled:
836 metadata.CompletedAt = time.Now().UTC()
837 case export.Started, export.Transferring:
838 // Non-terminal — no CompletedAt needed.
839 }
840 }
841
842 data, err := json.MarshalIndent(metadata, "", " ")
843 if err != nil {
844 return fmt.Errorf("marshal metadata: %w", err)
845 }
846
847 const maxRetries = 3
848 for attempt := range maxRetries {
849 _, err = backend.Write(ctx, exportID, exportMetadataFile, bucket, path, newBytesReadCloser(data))
850 if err == nil {
851 return nil
852 }
853 if attempt < maxRetries-1 {
854 logger.WithField("action", "export_write_metadata").
855 WithField("export_id", exportID).
856 Warnf("metadata write attempt %d failed, retrying: %v", attempt+1, err)
857 select {
858 case <-ctx.Done():
859 return fmt.Errorf("write metadata aborted after %d attempts: %w", attempt+1, ctx.Err())
860 case <-time.After(500 * time.Millisecond):
861 }
862 }
863 }
864 return fmt.Errorf("write metadata after %d attempts: %w", maxRetries, err)
865}
866
867// readNodeStatus reads and unmarshals a node's status file from the storage backend.
868func readNodeStatus(ctx context.Context, backend modulecapabilities.BackupBackend, exportID, bucket, path, nodeName string) (*NodeStatus, error) {

Callers 4

StatusMethod · 0.85
CancelMethod · 0.85
startExportMethod · 0.85
tryPromoteMetadataMethod · 0.85

Calls 7

newBytesReadCloserFunction · 0.85
ErrorfMethod · 0.80
NowMethod · 0.65
WriteMethod · 0.65
DoneMethod · 0.65
ErrMethod · 0.65
WithTimeoutMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…