MCPcopy
hub / github.com/subtrace/subtrace / flushOnce

Method flushOnce

tracer/manager.go:128–182  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

126}
127
128func (b *block) flushOnce(ctx context.Context) error {
129 b.mu.Lock()
130 defer b.mu.Unlock()
131 if b.flushed {
132 return fmt.Errorf("block already flushed")
133 }
134
135 b.frozen = true
136
137 // Check the actual array instead of count to see if the block is empty.
138 // b.count exists only because it's nice to log the approximate size of the
139 // data in the block without locking it.
140 if len(b.events) == 0 {
141 b.flushed = true
142 return nil
143 }
144
145 slog.Debug("flushing event buffer block", "block", b)
146
147 var tun tunnel.Create_Response
148 if code, err := rpc.Call(ctx, &tun, "/api/CreateTunnel", &tunnel.Create_Request{Role: tunnel.Role_INSERT}); err != nil {
149 return fmt.Errorf("call CreateTunnel: %w", err)
150 } else if code != http.StatusOK || tun.Error != "" {
151 err := fmt.Errorf("CreateTunnel: %s", http.StatusText(code))
152 if tun.Error != "" {
153 err = fmt.Errorf("%w: %s", err, tun.Error)
154 }
155 return err
156 }
157
158 tunnelID, err := uuid.Parse(tun.TunnelId)
159 if err != nil {
160 return fmt.Errorf("parse tunnelID: %w", err)
161 }
162 slog.Debug("created tunnel to push event buffer block", "block", b, "tunnelID", tunnelID)
163
164 start := time.Now()
165 conn, err := initTunnel(ctx, tunnelID, tun.Endpoint)
166 if err != nil {
167 var wsErr websocket.CloseError
168 if errors.As(err, &wsErr) && wsErr.Code == websocket.StatusGoingAway {
169 return fmt.Errorf("init tunnel: tunnel %s: timed out waiting for sink after %v", tunnelID.String(), time.Since(start))
170 }
171 return fmt.Errorf("init tunnel: tunnel %s: %w", tunnelID, err)
172 }
173 defer conn.Close(websocket.StatusNormalClosure, "")
174
175 if _, err := doInsert(ctx, conn, b.events); err != nil {
176 return fmt.Errorf("insert %d events (%d bytes): tunnel %s: %w", b.count.Load(), b.bytes.Load(), tunnelID, err)
177 }
178
179 b.flushed = true
180 slog.Debug("flushed data to clickhouse", "block", b, "took", time.Since(start).Round(time.Millisecond))
181 return nil
182}
183
184func (b *block) flush(ctx context.Context) error {
185 if os.Getenv("SUBTRACE_TOKEN") == "" {

Callers 1

flushMethod · 0.95

Calls 7

CallFunction · 0.92
initTunnelFunction · 0.85
doInsertFunction · 0.85
LockMethod · 0.80
LoadMethod · 0.80
StringMethod · 0.45
CloseMethod · 0.45

Tested by

no test coverage detected