| 168 | } |
| 169 | |
| 170 | export const layerWith = (options?: LayerOptions) => |
| 171 | Layer.effect( |
| 172 | Service, |
| 173 | Effect.gen(function* () { |
| 174 | const pubsub = { |
| 175 | all: yield* PubSub.unbounded<Payload>(), |
| 176 | durable: new Map<string, Set<PubSub.PubSub<void>>>(), |
| 177 | typed: new Map<string, PubSub.PubSub<Payload>>(), |
| 178 | } |
| 179 | const projectors = new Map<string, Subscriber[]>() |
| 180 | // TODO: Bind durable projectors to exact type+version before supporting incompatible historical payloads. |
| 181 | const listeners = new Array<Subscriber>() |
| 182 | const { db } = yield* Database.Service |
| 183 | |
| 184 | const getOrCreate = (definition: Definition) => |
| 185 | Effect.gen(function* () { |
| 186 | const existing = pubsub.typed.get(definition.type) |
| 187 | if (existing) return existing |
| 188 | const created = yield* PubSub.unbounded<Payload>() |
| 189 | pubsub.typed.set(definition.type, created) |
| 190 | return created |
| 191 | }) |
| 192 | |
| 193 | yield* Effect.addFinalizer(() => |
| 194 | Effect.gen(function* () { |
| 195 | yield* PubSub.shutdown(pubsub.all) |
| 196 | yield* Effect.forEach( |
| 197 | pubsub.durable.values(), |
| 198 | (pubsubs) => Effect.forEach(pubsubs, PubSub.shutdown, { discard: true }), |
| 199 | { discard: true }, |
| 200 | ) |
| 201 | yield* Effect.forEach(pubsub.typed.values(), PubSub.shutdown, { discard: true }) |
| 202 | }), |
| 203 | ) |
| 204 | |
| 205 | function commitDurableEvent( |
| 206 | definition: Definition, |
| 207 | event: Payload, |
| 208 | input?: { |
| 209 | readonly seq: number |
| 210 | readonly aggregateID: string |
| 211 | readonly ownerID?: string |
| 212 | readonly strictOwner?: boolean |
| 213 | }, |
| 214 | commit?: (seq: number) => Effect.Effect<void>, |
| 215 | ) { |
| 216 | return Effect.gen(function* () { |
| 217 | const durable = definition?.durable |
| 218 | if (durable) { |
| 219 | const aggregateID = (event.data as Record<string, unknown>)[durable.aggregate] |
| 220 | if (typeof aggregateID !== "string") { |
| 221 | yield* Effect.die( |
| 222 | new InvalidDurableEventError({ |
| 223 | type: event.type, |
| 224 | message: `Expected string aggregate field ${durable.aggregate}`, |
| 225 | }), |
| 226 | ) |
| 227 | } else { |