(
message: Message.IncomingLocal<R>
)
| 372 | ) |
| 373 | |
| 374 | function sendLocal<R extends Rpc.Any>( |
| 375 | message: Message.IncomingLocal<R> |
| 376 | ): Effect.Effect<void, EntityNotAssignedToRunner | MailboxFull | AlreadyProcessingMessage> { |
| 377 | return Effect.locally( |
| 378 | Effect.flatMap( |
| 379 | entities.get(message.envelope.address), |
| 380 | (server): Effect.Effect<void, EntityNotAssignedToRunner | MailboxFull | AlreadyProcessingMessage> => { |
| 381 | switch (message._tag) { |
| 382 | case "IncomingRequestLocal": { |
| 383 | // If the request is already running, then we might have more than |
| 384 | // one sender for the same request. In this case, the other senders |
| 385 | // should resume from storage only. |
| 386 | let entry = server.activeRequests.get(message.envelope.requestId) |
| 387 | if (entry || processedRequestIds.has(message.envelope.requestId)) { |
| 388 | return Effect.fail( |
| 389 | new AlreadyProcessingMessage({ |
| 390 | envelopeId: message.envelope.requestId, |
| 391 | address: message.envelope.address |
| 392 | }) |
| 393 | ) |
| 394 | } |
| 395 | |
| 396 | const rpc = entityRpcs.get(message.envelope.tag)! as any as Rpc.AnyWithProps |
| 397 | if (!storageEnabled && Context.get(rpc.annotations, Persisted)) { |
| 398 | return Effect.dieMessage( |
| 399 | "EntityManager.sendLocal: Cannot process a persisted message without MessageStorage" |
| 400 | ) |
| 401 | } |
| 402 | |
| 403 | // Cluster internal RPCs |
| 404 | |
| 405 | // keep-alive RPC |
| 406 | if (rpc._tag === KeepAliveRpc._tag) { |
| 407 | const msg = message as unknown as Message.IncomingRequestLocal<typeof KeepAliveRpc> |
| 408 | const reply = Effect.suspend(() => |
| 409 | Effect.orDie(retryRespond( |
| 410 | 4, |
| 411 | msg.respond( |
| 412 | new Reply.WithExit<typeof KeepAliveRpc>({ |
| 413 | requestId: message.envelope.requestId, |
| 414 | id: snowflakeGen.unsafeNext(), |
| 415 | exit: Exit.void |
| 416 | }) |
| 417 | ) |
| 418 | )) |
| 419 | ) |
| 420 | |
| 421 | if (server.keepAliveEnabled) return reply |
| 422 | server.keepAliveEnabled = true |
| 423 | return server.keepAliveLatch.whenOpen(Effect.suspend(() => { |
| 424 | server.keepAliveEnabled = false |
| 425 | return reply |
| 426 | })).pipe( |
| 427 | Effect.forkIn(server.scope), |
| 428 | Effect.asVoid |
| 429 | ) |
| 430 | } |
| 431 |
no test coverage detected