| 116 | ) |
| 117 | |
| 118 | class ConnectionImpl implements Connection { |
| 119 | constructor(private readonly conn: Clickhouse.ClickHouseClient) {} |
| 120 | |
| 121 | private runRaw(sql: string, params: ReadonlyArray<unknown>, format: Clickhouse.DataFormat = "JSON") { |
| 122 | const paramsObj: Record<string, unknown> = {} |
| 123 | for (let i = 0; i < params.length; i++) { |
| 124 | paramsObj[`p${i + 1}`] = params[i] |
| 125 | } |
| 126 | return Effect.withFiberRuntime<Clickhouse.ResultSet<"JSON"> | Clickhouse.CommandResult, SqlError>((fiber) => { |
| 127 | const method = fiber.getFiberRef(currentClientMethod) |
| 128 | return Effect.async<Clickhouse.ResultSet<"JSON"> | Clickhouse.CommandResult, SqlError>((resume) => { |
| 129 | const queryId = fiber.getFiberRef(currentQueryId) ?? Crypto.randomUUID() |
| 130 | const settings = fiber.getFiberRef(currentClickhouseSettings) ?? {} |
| 131 | const controller = new AbortController() |
| 132 | if (method === "command") { |
| 133 | this.conn.command({ |
| 134 | query: sql, |
| 135 | query_params: paramsObj, |
| 136 | abort_signal: controller.signal, |
| 137 | query_id: queryId, |
| 138 | clickhouse_settings: settings |
| 139 | }).then( |
| 140 | (result) => resume(Effect.succeed(result)), |
| 141 | (cause) => resume(Effect.fail(new SqlError({ cause, message: "Failed to execute statement" }))) |
| 142 | ) |
| 143 | } else { |
| 144 | this.conn.query({ |
| 145 | query: sql, |
| 146 | query_params: paramsObj, |
| 147 | abort_signal: controller.signal, |
| 148 | query_id: queryId, |
| 149 | clickhouse_settings: settings, |
| 150 | format |
| 151 | }).then( |
| 152 | (result) => resume(Effect.succeed(result)), |
| 153 | (cause) => resume(Effect.fail(new SqlError({ cause, message: "Failed to execute statement" }))) |
| 154 | ) |
| 155 | } |
| 156 | return Effect.suspend(() => { |
| 157 | controller.abort() |
| 158 | return Effect.promise(() => this.conn.command({ query: `KILL QUERY WHERE query_id = '${queryId}'` })) |
| 159 | }) |
| 160 | }) |
| 161 | }) |
| 162 | } |
| 163 | |
| 164 | private run(sql: string, params: ReadonlyArray<unknown>, format?: Clickhouse.DataFormat) { |
| 165 | return this.runRaw(sql, params, format).pipe( |
| 166 | Effect.flatMap((result) => { |
| 167 | if ("json" in result) { |
| 168 | return Effect.promise(() => |
| 169 | result.json().then( |
| 170 | (result) => "data" in result ? result.data : result as any, |
| 171 | () => [] |
| 172 | ) |
| 173 | ) |
| 174 | } |
| 175 | return Effect.succeed([]) |
nothing calls this directly
no outgoing calls
no test coverage detected