MCPcopy Index your code
hub / github.com/Effect-TS/effect / MemoryDriver

Class MemoryDriver

packages/cluster/src/MessageStorage.ts:686–886  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

684 * @category Memory
685 */
686export class MemoryDriver extends Effect.Service<MemoryDriver>()("@effect/cluster/MessageStorage/MemoryDriver", {
687 dependencies: [Snowflake.layerGenerator],
688 effect: Effect.gen(function*() {
689 const clock = yield* Effect.clock
690 const requests = new Map<string, MemoryEntry>()
691 const requestsByPrimaryKey = new Map<string, MemoryEntry>()
692 const unprocessed = new Set<Envelope.Envelope.Encoded>()
693 const replyIds = new Set<string>()
694
695 const journal: Array<Envelope.Envelope.Encoded> = []
696
697 const cursors = new WeakMap<{}, number>()
698
699 const unprocessedWith = (predicate: Predicate<Envelope.Envelope.Encoded>) => {
700 const messages: Array<{
701 readonly envelope: Envelope.Envelope.Encoded
702 readonly lastSentReply: Option.Option<Reply.ReplyEncoded<any>>
703 }> = []
704 const now = clock.unsafeCurrentTimeMillis()
705 for (const envelope of unprocessed) {
706 if (!predicate(envelope)) {
707 continue
708 }
709 if (envelope._tag === "Request") {
710 const entry = requests.get(envelope.requestId)
711 if (entry?.deliverAt && entry.deliverAt > now) {
712 continue
713 }
714 messages.push({
715 envelope,
716 lastSentReply: Option.fromNullable(entry?.replies[entry.replies.length - 1])
717 })
718 } else {
719 messages.push({
720 envelope,
721 lastSentReply: Option.none()
722 })
723 }
724 }
725 return messages
726 }
727
728 const replyLatch = yield* Effect.makeLatch()
729
730 function repliesFor(requestIds: Array<string>) {
731 const replies = Arr.empty<Reply.ReplyEncoded<any>>()
732 for (const requestId of requestIds) {
733 const request = requests.get(requestId)
734 if (!request) continue
735 else if (Option.isNone(request.lastReceivedChunk)) {
736 // eslint-disable-next-line no-restricted-syntax
737 replies.push(...request.replies)
738 continue
739 }
740 const sequence = request.lastReceivedChunk.value.sequence
741 for (const reply of request.replies) {
742 if (reply._tag === "Chunk" && reply.sequence <= sequence) {
743 continue

Callers

nothing calls this directly

Calls 12

StringInterface · 0.85
makeEncodedFunction · 0.85
syncMethod · 0.80
parseMethod · 0.80
unsafeOpenMethod · 0.80
lastMethod · 0.80
getMethod · 0.65
setMethod · 0.65
pipeMethod · 0.65
addMethod · 0.65
mapMethod · 0.65

Tested by

no test coverage detected