MCPcopy Index your code
hub / github.com/Effect-TS/effect / make

Function make

packages/cluster/src/MessageStorage.ts:348–444  ·  view source on GitHub ↗
(
  storage: Omit<
    MessageStorage["Type"],
    "registerReplyHandler" | "unregisterReplyHandler" | "unregisterShardReplyHandlers"
  >
)

Source from the content-addressed store, hash-verified

346 * @category constructors
347 */
348export const make = (
349 storage: Omit<
350 MessageStorage["Type"],
351 "registerReplyHandler" | "unregisterReplyHandler" | "unregisterShardReplyHandlers"
352 >
353): Effect.Effect<MessageStorage["Type"]> =>
354 Effect.sync(() => {
355 type ReplyHandler = {
356 readonly message: Message.OutgoingRequest<any> | Message.IncomingRequest<any>
357 readonly shardSet: Set<ReplyHandler>
358 readonly respond: (reply: Reply.ReplyWithContext<any>) => Effect.Effect<void, PersistenceError | MalformedMessage>
359 readonly resume: (effect: Effect.Effect<void, EntityNotAssignedToRunner>) => void
360 }
361 const replyHandlers = new Map<Snowflake.Snowflake, Array<ReplyHandler>>()
362 const replyHandlersShard = new Map<string, Set<ReplyHandler>>()
363 return MessageStorage.of({
364 ...storage,
365 registerReplyHandler: (message) => {
366 const requestId = message.envelope.requestId
367 return Effect.async<void, EntityNotAssignedToRunner>((resume) => {
368 const shardId = message.envelope.address.shardId.toString()
369 let handlers = replyHandlers.get(requestId)
370 if (handlers === undefined) {
371 handlers = []
372 replyHandlers.set(requestId, handlers)
373 }
374 let shardSet = replyHandlersShard.get(shardId)
375 if (!shardSet) {
376 shardSet = new Set()
377 replyHandlersShard.set(shardId, shardSet)
378 }
379 const entry: ReplyHandler = {
380 message,
381 shardSet,
382 respond: message._tag === "IncomingRequest" ? message.respond : (reply) => message.respond(reply.reply),
383 resume
384 }
385 handlers.push(entry)
386 shardSet.add(entry)
387 return Effect.sync(() => {
388 const index = handlers.indexOf(entry)
389 handlers.splice(index, 1)
390 shardSet.delete(entry)
391 })
392 })
393 },
394 unregisterReplyHandler: (requestId) =>
395 Effect.sync(() => {
396 const handlers = replyHandlers.get(requestId)
397 if (!handlers) return Effect.void
398 replyHandlers.delete(requestId)
399 for (let i = 0; i < handlers.length; i++) {
400 const handler = handlers[i]
401 handler.shardSet.delete(handler)
402 handler.resume(Effect.fail(
403 new EntityNotAssignedToRunner({
404 address: handler.message.envelope.address
405 })

Callers 1

MessageStorage.tsFile · 0.70

Calls 8

syncMethod · 0.80
resumeMethod · 0.80
ofMethod · 0.65
toStringMethod · 0.65
getMethod · 0.65
setMethod · 0.65
addMethod · 0.65
failMethod · 0.65

Tested by

no test coverage detected