MCPcopy Index your code
hub / github.com/Effect-TS/effect / take

Method take

packages/effect/src/internal/queue.ts:239–268  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

237 }
238
239 get take(): Effect.Effect<A> {
240 return core.withFiberRuntime((state) => {
241 if (MutableRef.get(this.shutdownFlag)) {
242 return core.interrupt
243 }
244 const item = this.queue.poll(MutableQueue.EmptyMutableQueue)
245 if (item !== MutableQueue.EmptyMutableQueue) {
246 this.strategy.unsafeOnQueueEmptySpace(this.queue, this.takers)
247 return core.succeed(item)
248 } else {
249 // Add the deferred to takers, then:
250 // - Try to take again in case a value was added since
251 // - Wait for the deferred to be completed
252 // - Clean up resources in case of interruption
253 const deferred = core.deferredUnsafeMake<A>(state.id())
254 return pipe(
255 core.suspend(() => {
256 pipe(this.takers, MutableQueue.offer(deferred))
257 unsafeCompleteTakers(this.strategy, this.queue, this.takers)
258 return MutableRef.get(this.shutdownFlag) ?
259 core.interrupt :
260 core.deferredAwait(deferred)
261 }),
262 core.onInterrupt(() => {
263 return core.sync(() => unsafeRemove(this.takers, deferred))
264 })
265 )
266 }
267 })
268 }
269
270 get takeAll(): Effect.Effect<Chunk.Chunk<A>> {
271 return core.suspend(() => {

Callers

nothing calls this directly

Calls 8

syncMethod · 0.80
pipeFunction · 0.70
unsafeRemoveFunction · 0.70
getMethod · 0.65
pollMethod · 0.65
idMethod · 0.65
offerMethod · 0.65

Tested by

no test coverage detected