MCPcopy
hub / github.com/Effect-TS/effect / queryStream

Function queryStream

packages/sql-mysql2/src/MysqlClient.ts:328–357  ·  view source on GitHub ↗
(
  conn: Mysql.PoolConnection,
  sql: string,
  params?: ReadonlyArray<any>
)

Source from the content-addressed store, hash-verified

326const escape = Statement.defaultEscape("`")
327
328function queryStream(
329 conn: Mysql.PoolConnection,
330 sql: string,
331 params?: ReadonlyArray<any>
332) {
333 return asyncPauseResume<any, SqlError>((emit) => {
334 const query = (conn as any).query(sql, params).stream()
335 let buffer: Array<any> = []
336 let taskPending = false
337 query.on("error", (cause: unknown) => emit.fail(new SqlError({ cause, message: "Failed to stream statement" })))
338 query.on("data", (row: any) => {
339 buffer.push(row)
340 if (!taskPending) {
341 taskPending = true
342 queueMicrotask(() => {
343 const items = buffer
344 buffer = []
345 emit.array(items)
346 taskPending = false
347 })
348 }
349 })
350 query.on("end", () => emit.end())
351 return {
352 onInterrupt: Effect.sync(() => query.destroy()),
353 onPause: Effect.sync(() => query.pause()),
354 onResume: Effect.sync(() => query.resume())
355 }
356 })
357}

Callers 1

executeStreamMethod · 0.85

Calls 8

asyncPauseResumeFunction · 0.90
arrayMethod · 0.80
syncMethod · 0.80
destroyMethod · 0.80
resumeMethod · 0.80
failMethod · 0.65
endMethod · 0.65
streamMethod · 0.45

Tested by

no test coverage detected