| 684 | * @category Memory |
| 685 | */ |
| 686 | export class MemoryDriver extends Effect.Service<MemoryDriver>()("@effect/cluster/MessageStorage/MemoryDriver", { |
| 687 | dependencies: [Snowflake.layerGenerator], |
| 688 | effect: Effect.gen(function*() { |
| 689 | const clock = yield* Effect.clock |
| 690 | const requests = new Map<string, MemoryEntry>() |
| 691 | const requestsByPrimaryKey = new Map<string, MemoryEntry>() |
| 692 | const unprocessed = new Set<Envelope.Envelope.Encoded>() |
| 693 | const replyIds = new Set<string>() |
| 694 | |
| 695 | const journal: Array<Envelope.Envelope.Encoded> = [] |
| 696 | |
| 697 | const cursors = new WeakMap<{}, number>() |
| 698 | |
| 699 | const unprocessedWith = (predicate: Predicate<Envelope.Envelope.Encoded>) => { |
| 700 | const messages: Array<{ |
| 701 | readonly envelope: Envelope.Envelope.Encoded |
| 702 | readonly lastSentReply: Option.Option<Reply.ReplyEncoded<any>> |
| 703 | }> = [] |
| 704 | const now = clock.unsafeCurrentTimeMillis() |
| 705 | for (const envelope of unprocessed) { |
| 706 | if (!predicate(envelope)) { |
| 707 | continue |
| 708 | } |
| 709 | if (envelope._tag === "Request") { |
| 710 | const entry = requests.get(envelope.requestId) |
| 711 | if (entry?.deliverAt && entry.deliverAt > now) { |
| 712 | continue |
| 713 | } |
| 714 | messages.push({ |
| 715 | envelope, |
| 716 | lastSentReply: Option.fromNullable(entry?.replies[entry.replies.length - 1]) |
| 717 | }) |
| 718 | } else { |
| 719 | messages.push({ |
| 720 | envelope, |
| 721 | lastSentReply: Option.none() |
| 722 | }) |
| 723 | } |
| 724 | } |
| 725 | return messages |
| 726 | } |
| 727 | |
| 728 | const replyLatch = yield* Effect.makeLatch() |
| 729 | |
| 730 | function repliesFor(requestIds: Array<string>) { |
| 731 | const replies = Arr.empty<Reply.ReplyEncoded<any>>() |
| 732 | for (const requestId of requestIds) { |
| 733 | const request = requests.get(requestId) |
| 734 | if (!request) continue |
| 735 | else if (Option.isNone(request.lastReceivedChunk)) { |
| 736 | // eslint-disable-next-line no-restricted-syntax |
| 737 | replies.push(...request.replies) |
| 738 | continue |
| 739 | } |
| 740 | const sequence = request.lastReceivedChunk.value.sequence |
| 741 | for (const reply of request.replies) { |
| 742 | if (reply._tag === "Chunk" && reply.sequence <= sequence) { |
| 743 | continue |
nothing calls this directly
no test coverage detected