| 88 | * @since 1.0.0 |
| 89 | */ |
| 90 | export const make = ( |
| 91 | options: ClickhouseClientConfig |
| 92 | ): Effect.Effect<ClickhouseClient, SqlError, Scope.Scope | Reactivity.Reactivity> => |
| 93 | Effect.gen(function*() { |
| 94 | const compiler = makeCompiler(options.transformQueryNames) |
| 95 | const transformRows = options.transformResultNames |
| 96 | ? Statement.defaultTransforms(options.transformResultNames).array |
| 97 | : undefined |
| 98 | |
| 99 | const client = Clickhouse.createClient(options) |
| 100 | |
| 101 | yield* Effect.acquireRelease( |
| 102 | Effect.tryPromise({ |
| 103 | try: () => client.exec({ query: "SELECT 1" }), |
| 104 | catch: (cause) => new SqlError({ cause, message: "ClickhouseClient: Failed to connect" }) |
| 105 | }), |
| 106 | () => Effect.promise(() => client.close()) |
| 107 | ).pipe( |
| 108 | Effect.timeoutFail({ |
| 109 | duration: Duration.seconds(5), |
| 110 | onTimeout: () => |
| 111 | new SqlError({ |
| 112 | message: "ClickhouseClient: Connection timeout", |
| 113 | cause: new Error("connection timeout") |
| 114 | }) |
| 115 | }) |
| 116 | ) |
| 117 | |
| 118 | class ConnectionImpl implements Connection { |
| 119 | constructor(private readonly conn: Clickhouse.ClickHouseClient) {} |
| 120 | |
| 121 | private runRaw(sql: string, params: ReadonlyArray<unknown>, format: Clickhouse.DataFormat = "JSON") { |
| 122 | const paramsObj: Record<string, unknown> = {} |
| 123 | for (let i = 0; i < params.length; i++) { |
| 124 | paramsObj[`p${i + 1}`] = params[i] |
| 125 | } |
| 126 | return Effect.withFiberRuntime<Clickhouse.ResultSet<"JSON"> | Clickhouse.CommandResult, SqlError>((fiber) => { |
| 127 | const method = fiber.getFiberRef(currentClientMethod) |
| 128 | return Effect.async<Clickhouse.ResultSet<"JSON"> | Clickhouse.CommandResult, SqlError>((resume) => { |
| 129 | const queryId = fiber.getFiberRef(currentQueryId) ?? Crypto.randomUUID() |
| 130 | const settings = fiber.getFiberRef(currentClickhouseSettings) ?? {} |
| 131 | const controller = new AbortController() |
| 132 | if (method === "command") { |
| 133 | this.conn.command({ |
| 134 | query: sql, |
| 135 | query_params: paramsObj, |
| 136 | abort_signal: controller.signal, |
| 137 | query_id: queryId, |
| 138 | clickhouse_settings: settings |
| 139 | }).then( |
| 140 | (result) => resume(Effect.succeed(result)), |
| 141 | (cause) => resume(Effect.fail(new SqlError({ cause, message: "Failed to execute statement" }))) |
| 142 | ) |
| 143 | } else { |
| 144 | this.conn.query({ |
| 145 | query: sql, |
| 146 | query_params: paramsObj, |
| 147 | abort_signal: controller.signal, |