MCPcopy
hub / github.com/Effect-TS/effect / MailboxImpl

Class MailboxImpl

packages/effect/src/internal/mailbox.ts:72–468  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

70const constDone = [empty, true] as const
71
72class MailboxImpl<A, E> extends Effectable.Class<readonly [messages: Chunk.Chunk<A>, done: boolean], E>
73 implements Api.Mailbox<A, E>
74{
75 readonly [TypeId]: Api.TypeId = TypeId
76 readonly [ReadonlyTypeId]: Api.ReadonlyTypeId = ReadonlyTypeId
77 private state: MailboxState<A, E> = {
78 _tag: "Open",
79 takers: new Set(),
80 offers: new Set(),
81 awaiters: new Set()
82 }
83 private messages: Array<A> = []
84 private messagesChunk = Chunk.empty<A>()
85 constructor(
86 readonly scheduler: Scheduler,
87 private capacity: number,
88 readonly strategy: "suspend" | "dropping" | "sliding"
89 ) {
90 super()
91 }
92
93 offer(message: A): Effect<boolean> {
94 return core.suspend(() => {
95 if (this.state._tag !== "Open") {
96 return exitFalse
97 } else if (this.messages.length + this.messagesChunk.length >= this.capacity) {
98 switch (this.strategy) {
99 case "dropping":
100 return exitFalse
101 case "suspend":
102 if (this.capacity <= 0 && this.state.takers.size > 0) {
103 this.messages.push(message)
104 this.releaseTaker()
105 return exitTrue
106 }
107 return this.offerRemainingSingle(message)
108 case "sliding":
109 this.unsafeTake()
110 this.messages.push(message)
111 return exitTrue
112 }
113 }
114 this.messages.push(message)
115 this.scheduleReleaseTaker()
116 return exitTrue
117 })
118 }
119 unsafeOffer(message: A): boolean {
120 if (this.state._tag !== "Open") {
121 return false
122 } else if (this.messages.length + this.messagesChunk.length >= this.capacity) {
123 if (this.strategy === "sliding") {
124 this.unsafeTake()
125 this.messages.push(message)
126 return true
127 } else if (this.capacity <= 0 && this.state.takers.size > 0) {
128 this.messages.push(message)
129 this.releaseTaker()

Callers

nothing calls this directly

Calls 11

finalizeMethod · 0.95
doneMethod · 0.95
unsafeTakeAllMethod · 0.95
releaseCapacityMethod · 0.95
unsafeTakeMethod · 0.95
unsafeSizeMethod · 0.95
syncMethod · 0.80
resumeMethod · 0.80
clearMethod · 0.80
resumeFunction · 0.70
addMethod · 0.65

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…