| 263 | * @category compaction |
| 264 | */ |
| 265 | export const groupCompaction = <Events extends Event.Any, R>( |
| 266 | group: EventGroup<Events>, |
| 267 | effect: (options: { |
| 268 | readonly primaryKey: string |
| 269 | readonly entries: Array<Entry> |
| 270 | readonly events: Array<Event.TaggedPayload<Events>> |
| 271 | readonly write: <Tag extends Event.Tag<Events>>( |
| 272 | tag: Tag, |
| 273 | payload: Event.PayloadWithTag<Events, Tag> |
| 274 | ) => Effect.Effect<void> |
| 275 | }) => Effect.Effect<void, never, R> |
| 276 | ): Layer.Layer<never, never, Identity | EventJournal | R | Event.Context<Events>> => |
| 277 | Effect.gen(function*() { |
| 278 | const log = yield* EventLog |
| 279 | const context = yield* Effect.context<R | Event.Context<Events>>() |
| 280 | |
| 281 | yield* log.registerCompaction({ |
| 282 | events: Object.keys(group.events), |
| 283 | effect: Effect.fnUntraced(function*({ entries, write }) { |
| 284 | const writePayload = (timestamp: number, tag: string, payload: any) => |
| 285 | Effect.gen(function*() { |
| 286 | const event = group.events[tag] as any as Event.AnyWithProps |
| 287 | const entry = new Entry({ |
| 288 | id: makeEntryId({ msecs: timestamp }), |
| 289 | event: tag, |
| 290 | payload: yield* (Schema.encode(event.payloadMsgPack)(payload).pipe( |
| 291 | Effect.locally(FiberRef.currentContext, context), |
| 292 | Effect.orDie |
| 293 | ) as Effect.Effect<Uint8Array>), |
| 294 | primaryKey: event.primaryKey(payload) |
| 295 | }, { disableValidation: true }) |
| 296 | yield* write(entry) |
| 297 | }) |
| 298 | |
| 299 | const byPrimaryKey = new Map< |
| 300 | string, |
| 301 | { |
| 302 | readonly entries: Array<Entry> |
| 303 | readonly taggedPayloads: Array<{ |
| 304 | readonly _tag: string |
| 305 | readonly payload: any |
| 306 | }> |
| 307 | } |
| 308 | >() |
| 309 | for (const entry of entries) { |
| 310 | const payload = |
| 311 | yield* (Schema.decodeUnknown((group.events[entry.event] as any).payloadMsgPack)(entry.payload).pipe( |
| 312 | Effect.locally(FiberRef.currentContext, context) |
| 313 | ) as Effect.Effect<any>) |
| 314 | |
| 315 | if (byPrimaryKey.has(entry.primaryKey)) { |
| 316 | const record = byPrimaryKey.get(entry.primaryKey)! |
| 317 | record.entries.push(entry) |
| 318 | record.taggedPayloads.push({ |
| 319 | _tag: entry.event, |
| 320 | payload |
| 321 | }) |
| 322 | } else { |