| 78 | } |
| 79 | |
| 80 | func doInsert(ctx context.Context, conn *websocket.Conn, events []string) (int, error) { |
| 81 | // TODO(adtac): it's wasteful to re-encode and copy the data into yet another |
| 82 | // byte buffer (similarly for the read). Consider using protodelim instead? |
| 83 | tunnelQueryID := uuid.New() |
| 84 | qmsg, err := proto.Marshal(&tunnel.Insert{TunnelQueryId: tunnelQueryID.String(), Events: events}) |
| 85 | if err != nil { |
| 86 | return 0, fmt.Errorf("query: marshal: %w", err) |
| 87 | } |
| 88 | if err := conn.Write(ctx, websocket.MessageBinary, qmsg); err != nil { |
| 89 | return len(qmsg), fmt.Errorf("query: write: %w", err) |
| 90 | } |
| 91 | |
| 92 | var result tunnel.Result |
| 93 | typ, rmsg, err := conn.Read(ctx) |
| 94 | if err != nil { |
| 95 | return len(qmsg), fmt.Errorf("result: read: %w", err) |
| 96 | } |
| 97 | if typ != websocket.MessageBinary { |
| 98 | return len(qmsg), fmt.Errorf("result: unexpected websocket message type %d", typ) |
| 99 | } |
| 100 | if err := proto.Unmarshal(rmsg, &result); err != nil { |
| 101 | return len(qmsg), fmt.Errorf("result: unmarshal: %w", err) |
| 102 | } |
| 103 | |
| 104 | if result.TunnelQueryId != tunnelQueryID.String() { |
| 105 | // TODO(adtac): This shouldn't really be an error. We're opening a new |
| 106 | // tunnel session for every INSERT, which is wasteful. When we start |
| 107 | // reusing the same tunnel for multiple queries, there needs to be a tunnel |
| 108 | // manager that routes results to the right query based on the query ID. |
| 109 | return len(qmsg), fmt.Errorf("got result query ID %s, want %s", result.TunnelQueryId, tunnelQueryID.String()) |
| 110 | } |
| 111 | if result.TunnelError != "" { |
| 112 | return len(qmsg), fmt.Errorf("tunnel error: %s", result.TunnelError) |
| 113 | } |
| 114 | if result.ClickhouseError != "" { |
| 115 | return len(qmsg), fmt.Errorf("clickhouse error: %s", result.ClickhouseError) |
| 116 | } |
| 117 | return len(qmsg), nil |
| 118 | } |
| 119 | |
| 120 | func (b *block) LogValue() slog.Value { |
| 121 | return slog.GroupValue( |