MCPcopy Index your code
hub / github.com/Effect-TS/effect / groupCompaction

Function groupCompaction

packages/experimental/src/EventLog.ts:265–347  ·  view source on GitHub ↗
(
  group: EventGroup<Events>,
  effect: (options: {
    readonly primaryKey: string
    readonly entries: Array<Entry>
    readonly events: Array<Event.TaggedPayload<Events>>
    readonly write: <Tag extends Event.Tag<Events>>(
      tag: Tag,
      payload: Event.PayloadWithTag<Events, Tag>
    ) => Effect.Effect<void>
  }) => Effect.Effect<void, never, R>
)

Source from the content-addressed store, hash-verified

263 * @category compaction
264 */
265export const groupCompaction = <Events extends Event.Any, R>(
266 group: EventGroup<Events>,
267 effect: (options: {
268 readonly primaryKey: string
269 readonly entries: Array<Entry>
270 readonly events: Array<Event.TaggedPayload<Events>>
271 readonly write: <Tag extends Event.Tag<Events>>(
272 tag: Tag,
273 payload: Event.PayloadWithTag<Events, Tag>
274 ) => Effect.Effect<void>
275 }) => Effect.Effect<void, never, R>
276): Layer.Layer<never, never, Identity | EventJournal | R | Event.Context<Events>> =>
277 Effect.gen(function*() {
278 const log = yield* EventLog
279 const context = yield* Effect.context<R | Event.Context<Events>>()
280
281 yield* log.registerCompaction({
282 events: Object.keys(group.events),
283 effect: Effect.fnUntraced(function*({ entries, write }) {
284 const writePayload = (timestamp: number, tag: string, payload: any) =>
285 Effect.gen(function*() {
286 const event = group.events[tag] as any as Event.AnyWithProps
287 const entry = new Entry({
288 id: makeEntryId({ msecs: timestamp }),
289 event: tag,
290 payload: yield* (Schema.encode(event.payloadMsgPack)(payload).pipe(
291 Effect.locally(FiberRef.currentContext, context),
292 Effect.orDie
293 ) as Effect.Effect<Uint8Array>),
294 primaryKey: event.primaryKey(payload)
295 }, { disableValidation: true })
296 yield* write(entry)
297 })
298
299 const byPrimaryKey = new Map<
300 string,
301 {
302 readonly entries: Array<Entry>
303 readonly taggedPayloads: Array<{
304 readonly _tag: string
305 readonly payload: any
306 }>
307 }
308 >()
309 for (const entry of entries) {
310 const payload =
311 yield* (Schema.decodeUnknown((group.events[entry.event] as any).payloadMsgPack)(entry.payload).pipe(
312 Effect.locally(FiberRef.currentContext, context)
313 ) as Effect.Effect<any>)
314
315 if (byPrimaryKey.has(entry.primaryKey)) {
316 const record = byPrimaryKey.get(entry.primaryKey)!
317 record.entries.push(entry)
318 record.taggedPayloads.push({
319 _tag: entry.event,
320 payload
321 })
322 } else {

Callers

nothing calls this directly

Calls 7

keysMethod · 0.80
pipeMethod · 0.65
contextMethod · 0.65
getMethod · 0.65
setMethod · 0.65
provideMethod · 0.65
effectFunction · 0.50

Tested by

no test coverage detected