MCPcopy
hub / github.com/Effect-TS/effect / sendLocal

Function sendLocal

packages/cluster/src/internal/entityManager.ts:374–480  ·  view source on GitHub ↗
(
    message: Message.IncomingLocal<R>
  )

Source from the content-addressed store, hash-verified

372 )
373
374 function sendLocal<R extends Rpc.Any>(
375 message: Message.IncomingLocal<R>
376 ): Effect.Effect<void, EntityNotAssignedToRunner | MailboxFull | AlreadyProcessingMessage> {
377 return Effect.locally(
378 Effect.flatMap(
379 entities.get(message.envelope.address),
380 (server): Effect.Effect<void, EntityNotAssignedToRunner | MailboxFull | AlreadyProcessingMessage> => {
381 switch (message._tag) {
382 case "IncomingRequestLocal": {
383 // If the request is already running, then we might have more than
384 // one sender for the same request. In this case, the other senders
385 // should resume from storage only.
386 let entry = server.activeRequests.get(message.envelope.requestId)
387 if (entry || processedRequestIds.has(message.envelope.requestId)) {
388 return Effect.fail(
389 new AlreadyProcessingMessage({
390 envelopeId: message.envelope.requestId,
391 address: message.envelope.address
392 })
393 )
394 }
395
396 const rpc = entityRpcs.get(message.envelope.tag)! as any as Rpc.AnyWithProps
397 if (!storageEnabled && Context.get(rpc.annotations, Persisted)) {
398 return Effect.dieMessage(
399 "EntityManager.sendLocal: Cannot process a persisted message without MessageStorage"
400 )
401 }
402
403 // Cluster internal RPCs
404
405 // keep-alive RPC
406 if (rpc._tag === KeepAliveRpc._tag) {
407 const msg = message as unknown as Message.IncomingRequestLocal<typeof KeepAliveRpc>
408 const reply = Effect.suspend(() =>
409 Effect.orDie(retryRespond(
410 4,
411 msg.respond(
412 new Reply.WithExit<typeof KeepAliveRpc>({
413 requestId: message.envelope.requestId,
414 id: snowflakeGen.unsafeNext(),
415 exit: Exit.void
416 })
417 )
418 ))
419 )
420
421 if (server.keepAliveEnabled) return reply
422 server.keepAliveEnabled = true
423 return server.keepAliveLatch.whenOpen(Effect.suspend(() => {
424 server.keepAliveEnabled = false
425 return reply
426 })).pipe(
427 Effect.forkIn(server.scope),
428 Effect.asVoid
429 )
430 }
431

Callers 1

entityManager.tsFile · 0.70

Calls 8

RequestIdFunction · 0.90
retryRespondFunction · 0.85
getMethod · 0.65
failMethod · 0.65
dieMessageMethod · 0.65
pipeMethod · 0.65
setMethod · 0.65
writeMethod · 0.65

Tested by

no test coverage detected