(input: StreamInput)
| 389 | } |
| 390 | |
| 391 | function createLayer(input: StreamInput) { |
| 392 | return Layer.fresh( |
| 393 | Layer.effect( |
| 394 | Service, |
| 395 | Effect.gen(function* () { |
| 396 | const scope = yield* Scope.make() |
| 397 | const abort = yield* Scope.provide(scope)( |
| 398 | Effect.acquireRelease( |
| 399 | Effect.sync(() => new AbortController()), |
| 400 | (abort) => Effect.sync(() => abort.abort()), |
| 401 | ), |
| 402 | ) |
| 403 | let closed = false |
| 404 | let closeStream = () => {} |
| 405 | const halt = () => { |
| 406 | abort.abort() |
| 407 | } |
| 408 | const stop = () => { |
| 409 | input.signal?.removeEventListener("abort", halt) |
| 410 | abort.abort() |
| 411 | closeStream() |
| 412 | } |
| 413 | const closeScope = () => { |
| 414 | if (closed) { |
| 415 | return Effect.void |
| 416 | } |
| 417 | |
| 418 | closed = true |
| 419 | stop() |
| 420 | return Scope.close(scope, Exit.void) |
| 421 | } |
| 422 | |
| 423 | input.signal?.addEventListener("abort", halt, { once: true }) |
| 424 | yield* Effect.addFinalizer(() => closeScope()) |
| 425 | |
| 426 | const events = yield* Scope.provide(scope)( |
| 427 | Effect.acquireRelease( |
| 428 | Effect.promise(() => |
| 429 | input.sdk.global.event({ |
| 430 | signal: abort.signal, |
| 431 | }), |
| 432 | ), |
| 433 | (events) => |
| 434 | Effect.sync(() => { |
| 435 | void events.stream.return(StreamClosed).catch(() => {}) |
| 436 | }), |
| 437 | ), |
| 438 | ) |
| 439 | closeStream = () => { |
| 440 | void events.stream.return(StreamClosed).catch(() => {}) |
| 441 | } |
| 442 | input.trace?.write("recv.subscribe", { |
| 443 | sessionID: input.sessionID, |
| 444 | }) |
| 445 | |
| 446 | const state: State = { |
| 447 | data: createSessionData(), |
| 448 | subagent: createSubagentData(), |
no test coverage detected