| 126 | } |
| 127 | |
| 128 | func (b *block) flushOnce(ctx context.Context) error { |
| 129 | b.mu.Lock() |
| 130 | defer b.mu.Unlock() |
| 131 | if b.flushed { |
| 132 | return fmt.Errorf("block already flushed") |
| 133 | } |
| 134 | |
| 135 | b.frozen = true |
| 136 | |
| 137 | // Check the actual array instead of count to see if the block is empty. |
| 138 | // b.count exists only because it's nice to log the approximate size of the |
| 139 | // data in the block without locking it. |
| 140 | if len(b.events) == 0 { |
| 141 | b.flushed = true |
| 142 | return nil |
| 143 | } |
| 144 | |
| 145 | slog.Debug("flushing event buffer block", "block", b) |
| 146 | |
| 147 | var tun tunnel.Create_Response |
| 148 | if code, err := rpc.Call(ctx, &tun, "/api/CreateTunnel", &tunnel.Create_Request{Role: tunnel.Role_INSERT}); err != nil { |
| 149 | return fmt.Errorf("call CreateTunnel: %w", err) |
| 150 | } else if code != http.StatusOK || tun.Error != "" { |
| 151 | err := fmt.Errorf("CreateTunnel: %s", http.StatusText(code)) |
| 152 | if tun.Error != "" { |
| 153 | err = fmt.Errorf("%w: %s", err, tun.Error) |
| 154 | } |
| 155 | return err |
| 156 | } |
| 157 | |
| 158 | tunnelID, err := uuid.Parse(tun.TunnelId) |
| 159 | if err != nil { |
| 160 | return fmt.Errorf("parse tunnelID: %w", err) |
| 161 | } |
| 162 | slog.Debug("created tunnel to push event buffer block", "block", b, "tunnelID", tunnelID) |
| 163 | |
| 164 | start := time.Now() |
| 165 | conn, err := initTunnel(ctx, tunnelID, tun.Endpoint) |
| 166 | if err != nil { |
| 167 | var wsErr websocket.CloseError |
| 168 | if errors.As(err, &wsErr) && wsErr.Code == websocket.StatusGoingAway { |
| 169 | return fmt.Errorf("init tunnel: tunnel %s: timed out waiting for sink after %v", tunnelID.String(), time.Since(start)) |
| 170 | } |
| 171 | return fmt.Errorf("init tunnel: tunnel %s: %w", tunnelID, err) |
| 172 | } |
| 173 | defer conn.Close(websocket.StatusNormalClosure, "") |
| 174 | |
| 175 | if _, err := doInsert(ctx, conn, b.events); err != nil { |
| 176 | return fmt.Errorf("insert %d events (%d bytes): tunnel %s: %w", b.count.Load(), b.bytes.Load(), tunnelID, err) |
| 177 | } |
| 178 | |
| 179 | b.flushed = true |
| 180 | slog.Debug("flushed data to clickhouse", "block", b, "took", time.Since(start).Round(time.Millisecond)) |
| 181 | return nil |
| 182 | } |
| 183 | |
| 184 | func (b *block) flush(ctx context.Context) error { |
| 185 | if os.Getenv("SUBTRACE_TOKEN") == "" { |