MCPcopy
hub / github.com/Effect-TS/effect / ConnectionImpl

Class ConnectionImpl

packages/sql-pg/src/PgClient.ts:123–273  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

121 undefined
122
123 class ConnectionImpl implements Connection {
124 readonly pg: Pg.PoolClient | undefined
125 constructor(pg?: Pg.PoolClient) {
126 this.pg = pg
127 }
128
129 private runWithClient<A>(f: (client: Pg.PoolClient, resume: (_: Effect.Effect<A, SqlError>) => void) => void) {
130 if (this.pg !== undefined) {
131 return Effect.async<A, SqlError>((resume) => {
132 f(this.pg!, resume)
133 return makeCancel(pool, this.pg!)
134 })
135 }
136 return Effect.async<A, SqlError>((resume) => {
137 let done = false
138 let cancel: Effect.Effect<void> | undefined = undefined
139 let client: Pg.PoolClient | undefined = undefined
140 function onError(cause: Error) {
141 cleanup(cause)
142 resume(Effect.fail(new SqlError({ cause, message: "Connection error" })))
143 }
144 function cleanup(cause?: Error) {
145 if (!done) client?.release(cause)
146 done = true
147 client?.off("error", onError)
148 }
149 pool.connect((cause, client_) => {
150 if (cause) {
151 return resume(Effect.fail(new SqlError({ cause, message: "Failed to acquire connection" })))
152 } else if (!client_) {
153 return resume(
154 Effect.fail(
155 new SqlError({ message: "Failed to acquire connection", cause: new Error("No client returned") })
156 )
157 )
158 } else if (done) {
159 client_.release()
160 return
161 }
162 client = client_
163 client.once("error", onError)
164 cancel = makeCancel(pool, client)
165 f(client, (eff) => {
166 cleanup()
167 resume(eff)
168 })
169 })
170 return Effect.suspend(() => {
171 if (!cancel) {
172 cleanup()
173 return Effect.void
174 }
175 return Effect.ensuring(cancel, Effect.sync(cleanup))
176 })
177 })
178 }
179
180 private run(query: string, params: ReadonlyArray<unknown>) {

Callers

nothing calls this directly

Calls

no outgoing calls

Tested by

no test coverage detected