MCPcopy
hub / github.com/Effect-TS/effect / invokeWithInterrupt

Function invokeWithInterrupt

packages/effect/src/internal/fiberRuntime.ts:3719–3786  ·  view source on GitHub ↗
(
  self: Effect.Effect<A, E, R>,
  entries: ReadonlyArray<Entry<unknown>>,
  onInterrupt?: () => void
)

Source from the content-addressed store, hash-verified

3717 entries: ReadonlyArray<Entry<unknown>>,
3718 onInterrupt?: () => void
3719) => Effect.Effect<void, E, R> = <A, E, R>(
3720 self: Effect.Effect<A, E, R>,
3721 entries: ReadonlyArray<Entry<unknown>>,
3722 onInterrupt?: () => void
3723) =>
3724 core.fiberIdWith((id) =>
3725 ensuring(
3726 core.flatMap(
3727 forkDaemon(core.interruptible(self)),
3728 (processing) =>
3729 core.async<void, E>((cb) => {
3730 const counts = entries.map((_) => _.listeners.count)
3731 const checkDone = () => {
3732 if (counts.every((count) => count === 0)) {
3733 if (
3734 entries.every((_) => {
3735 if (_.result.state.current._tag === "Pending") {
3736 return true
3737 } else if (
3738 _.result.state.current._tag === "Done" &&
3739 core.exitIsExit(_.result.state.current.effect) &&
3740 _.result.state.current.effect._tag === "Failure" &&
3741 internalCause.isInterrupted(_.result.state.current.effect.cause)
3742 ) {
3743 return true
3744 } else {
3745 return false
3746 }
3747 })
3748 ) {
3749 cleanup.forEach((f) => f())
3750 onInterrupt?.()
3751 cb(core.interruptFiber(processing))
3752 }
3753 }
3754 }
3755 processing.addObserver((exit) => {
3756 cleanup.forEach((f) => f())
3757 cb(exit)
3758 })
3759 const cleanup = entries.map((r, i) => {
3760 const observer = (count: number) => {
3761 counts[i] = count
3762 checkDone()
3763 }
3764 r.listeners.addObserver(observer)
3765 return () => r.listeners.removeObserver(observer)
3766 })
3767 checkDone()
3768 return core.sync(() => {
3769 cleanup.forEach((f) => f())
3770 })
3771 })
3772 ),
3773 core.suspend(() => {
3774 const residual = entries.flatMap((entry) => {
3775 if (!entry.state.completed) {
3776 return [entry]

Callers 3

runBlockedRequestsFunction · 0.85
fiberRuntime.tsFile · 0.85
makeBatchedFunction · 0.85

Calls 8

checkDoneFunction · 0.85
syncMethod · 0.80
forkDaemonFunction · 0.70
completeFunction · 0.70
mapMethod · 0.65
addObserverMethod · 0.65
removeObserverMethod · 0.65
fFunction · 0.50

Tested by

no test coverage detected