MCPcopy
hub / github.com/benbjohnson/litestream / Read

Method Read

internal/resumable_reader.go:66–132  ·  view source on GitHub ↗
(p []byte)

Source from the content-addressed store, hash-verified

64const resumableReaderMaxRetries = 3
65
66func (r *ResumableReader) Read(p []byte) (int, error) {
67 var lastErr error
68 for attempt := 0; attempt <= resumableReaderMaxRetries; attempt++ {
69 // Reopen the stream from the current offset if the previous
70 // connection was closed (rc is nil after a retry).
71 if r.rc == nil {
72 rc, err := r.client.OpenLTXFile(r.ctx, r.level, r.minTXID, r.maxTXID, r.offset, 0)
73 if err != nil {
74 if errors.Is(err, os.ErrNotExist) || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || r.ctx.Err() != nil {
75 return 0, fmt.Errorf("reopen ltx file at offset %d: %w", r.offset, err)
76 }
77 lastErr = fmt.Errorf("reopen ltx file at offset %d: %w", r.offset, err)
78 r.logger.Debug("reopen ltx file failed, retrying",
79 "level", r.level, "min", r.minTXID, "max", r.maxTXID,
80 "offset", r.offset, "error", err, "attempt", attempt+1)
81 continue
82 }
83 r.rc = rc
84 }
85
86 n, err := r.rc.Read(p)
87 r.offset += int64(n)
88
89 if err == nil {
90 return n, nil
91 }
92
93 if err == io.EOF {
94 // Distinguish legitimate EOF (fully read) from premature EOF
95 // (server closed idle connection). When the file size is known
96 // and we haven't read it all, treat as a connection drop.
97 if r.size > 0 && r.offset < r.size {
98 r.logger.Debug("premature EOF on ltx file, reconnecting",
99 "level", r.level, "min", r.minTXID, "max", r.maxTXID,
100 "offset", r.offset, "size", r.size, "attempt", attempt+1)
101 r.close()
102 r.rc = nil
103 if n > 0 {
104 // Return the bytes we did get. The caller (e.g. io.ReadFull)
105 // will call Read again, which will trigger the reopen above.
106 return n, nil
107 }
108 continue
109 }
110 return n, io.EOF
111 }
112
113 // Non-EOF error (connection reset, timeout, etc.). Close the dead
114 // stream so the next iteration reopens from the current offset.
115 r.logger.Debug("read error on ltx file, reconnecting",
116 "level", r.level, "min", r.minTXID, "max", r.maxTXID,
117 "error", err, "offset", r.offset, "attempt", attempt+1)
118 r.close()
119 r.rc = nil
120 if n > 0 {
121 return n, nil
122 }
123 continue

Callers 1

TestResumableReaderFunction · 0.95

Calls 4

closeMethod · 0.95
IsMethod · 0.80
OpenLTXFileMethod · 0.65
ErrMethod · 0.45

Tested by 1

TestResumableReaderFunction · 0.76