MCPcopy Index your code
hub / github.com/anomalyco/opencode / layerWith

Function layerWith

packages/core/src/event.ts:170–635  ·  view source on GitHub ↗
(options?: LayerOptions)

Source from the content-addressed store, hash-verified

168}
169
170export const layerWith = (options?: LayerOptions) =>
171 Layer.effect(
172 Service,
173 Effect.gen(function* () {
174 const pubsub = {
175 all: yield* PubSub.unbounded<Payload>(),
176 durable: new Map<string, Set<PubSub.PubSub<void>>>(),
177 typed: new Map<string, PubSub.PubSub<Payload>>(),
178 }
179 const projectors = new Map<string, Subscriber[]>()
180 // TODO: Bind durable projectors to exact type+version before supporting incompatible historical payloads.
181 const listeners = new Array<Subscriber>()
182 const { db } = yield* Database.Service
183
184 const getOrCreate = (definition: Definition) =>
185 Effect.gen(function* () {
186 const existing = pubsub.typed.get(definition.type)
187 if (existing) return existing
188 const created = yield* PubSub.unbounded<Payload>()
189 pubsub.typed.set(definition.type, created)
190 return created
191 })
192
193 yield* Effect.addFinalizer(() =>
194 Effect.gen(function* () {
195 yield* PubSub.shutdown(pubsub.all)
196 yield* Effect.forEach(
197 pubsub.durable.values(),
198 (pubsubs) => Effect.forEach(pubsubs, PubSub.shutdown, { discard: true }),
199 { discard: true },
200 )
201 yield* Effect.forEach(pubsub.typed.values(), PubSub.shutdown, { discard: true })
202 }),
203 )
204
205 function commitDurableEvent(
206 definition: Definition,
207 event: Payload,
208 input?: {
209 readonly seq: number
210 readonly aggregateID: string
211 readonly ownerID?: string
212 readonly strictOwner?: boolean
213 },
214 commit?: (seq: number) => Effect.Effect<void>,
215 ) {
216 return Effect.gen(function* () {
217 const durable = definition?.durable
218 if (durable) {
219 const aggregateID = (event.data as Record<string, unknown>)[durable.aggregate]
220 if (typeof aggregateID !== "string") {
221 yield* Effect.die(
222 new InvalidDurableEventError({
223 type: event.type,
224 message: `Expected string aggregate field ${durable.aggregate}`,
225 }),
226 )
227 } else {

Callers 1

event.tsFile · 0.70

Calls 1

valuesMethod · 0.45

Tested by

no test coverage detected