| 346 | * @category constructors |
| 347 | */ |
| 348 | export const make = ( |
| 349 | storage: Omit< |
| 350 | MessageStorage["Type"], |
| 351 | "registerReplyHandler" | "unregisterReplyHandler" | "unregisterShardReplyHandlers" |
| 352 | > |
| 353 | ): Effect.Effect<MessageStorage["Type"]> => |
| 354 | Effect.sync(() => { |
| 355 | type ReplyHandler = { |
| 356 | readonly message: Message.OutgoingRequest<any> | Message.IncomingRequest<any> |
| 357 | readonly shardSet: Set<ReplyHandler> |
| 358 | readonly respond: (reply: Reply.ReplyWithContext<any>) => Effect.Effect<void, PersistenceError | MalformedMessage> |
| 359 | readonly resume: (effect: Effect.Effect<void, EntityNotAssignedToRunner>) => void |
| 360 | } |
| 361 | const replyHandlers = new Map<Snowflake.Snowflake, Array<ReplyHandler>>() |
| 362 | const replyHandlersShard = new Map<string, Set<ReplyHandler>>() |
| 363 | return MessageStorage.of({ |
| 364 | ...storage, |
| 365 | registerReplyHandler: (message) => { |
| 366 | const requestId = message.envelope.requestId |
| 367 | return Effect.async<void, EntityNotAssignedToRunner>((resume) => { |
| 368 | const shardId = message.envelope.address.shardId.toString() |
| 369 | let handlers = replyHandlers.get(requestId) |
| 370 | if (handlers === undefined) { |
| 371 | handlers = [] |
| 372 | replyHandlers.set(requestId, handlers) |
| 373 | } |
| 374 | let shardSet = replyHandlersShard.get(shardId) |
| 375 | if (!shardSet) { |
| 376 | shardSet = new Set() |
| 377 | replyHandlersShard.set(shardId, shardSet) |
| 378 | } |
| 379 | const entry: ReplyHandler = { |
| 380 | message, |
| 381 | shardSet, |
| 382 | respond: message._tag === "IncomingRequest" ? message.respond : (reply) => message.respond(reply.reply), |
| 383 | resume |
| 384 | } |
| 385 | handlers.push(entry) |
| 386 | shardSet.add(entry) |
| 387 | return Effect.sync(() => { |
| 388 | const index = handlers.indexOf(entry) |
| 389 | handlers.splice(index, 1) |
| 390 | shardSet.delete(entry) |
| 391 | }) |
| 392 | }) |
| 393 | }, |
| 394 | unregisterReplyHandler: (requestId) => |
| 395 | Effect.sync(() => { |
| 396 | const handlers = replyHandlers.get(requestId) |
| 397 | if (!handlers) return Effect.void |
| 398 | replyHandlers.delete(requestId) |
| 399 | for (let i = 0; i < handlers.length; i++) { |
| 400 | const handler = handlers[i] |
| 401 | handler.shardSet.delete(handler) |
| 402 | handler.resume(Effect.fail( |
| 403 | new EntityNotAssignedToRunner({ |
| 404 | address: handler.message.envelope.address |
| 405 | }) |