MCPcopy Index your code
hub / github.com/subtrace/subtrace / doInsert

Function doInsert

tracer/manager.go:80–118  ·  view source on GitHub ↗
(ctx context.Context, conn *websocket.Conn, events []string)

Source from the content-addressed store, hash-verified

78}
79
80func doInsert(ctx context.Context, conn *websocket.Conn, events []string) (int, error) {
81 // TODO(adtac): it's wasteful to re-encode and copy the data into yet another
82 // byte buffer (similarly for the read). Consider using protodelim instead?
83 tunnelQueryID := uuid.New()
84 qmsg, err := proto.Marshal(&tunnel.Insert{TunnelQueryId: tunnelQueryID.String(), Events: events})
85 if err != nil {
86 return 0, fmt.Errorf("query: marshal: %w", err)
87 }
88 if err := conn.Write(ctx, websocket.MessageBinary, qmsg); err != nil {
89 return len(qmsg), fmt.Errorf("query: write: %w", err)
90 }
91
92 var result tunnel.Result
93 typ, rmsg, err := conn.Read(ctx)
94 if err != nil {
95 return len(qmsg), fmt.Errorf("result: read: %w", err)
96 }
97 if typ != websocket.MessageBinary {
98 return len(qmsg), fmt.Errorf("result: unexpected websocket message type %d", typ)
99 }
100 if err := proto.Unmarshal(rmsg, &result); err != nil {
101 return len(qmsg), fmt.Errorf("result: unmarshal: %w", err)
102 }
103
104 if result.TunnelQueryId != tunnelQueryID.String() {
105 // TODO(adtac): This shouldn't really be an error. We're opening a new
106 // tunnel session for every INSERT, which is wasteful. When we start
107 // reusing the same tunnel for multiple queries, there needs to be a tunnel
108 // manager that routes results to the right query based on the query ID.
109 return len(qmsg), fmt.Errorf("got result query ID %s, want %s", result.TunnelQueryId, tunnelQueryID.String())
110 }
111 if result.TunnelError != "" {
112 return len(qmsg), fmt.Errorf("tunnel error: %s", result.TunnelError)
113 }
114 if result.ClickhouseError != "" {
115 return len(qmsg), fmt.Errorf("clickhouse error: %s", result.ClickhouseError)
116 }
117 return len(qmsg), nil
118}
119
120func (b *block) LogValue() slog.Value {
121 return slog.GroupValue(

Callers 1

flushOnceMethod · 0.85

Calls 3

WriteMethod · 0.80
StringMethod · 0.45
ReadMethod · 0.45

Tested by

no test coverage detected