MCPcopy
hub / github.com/Effect-TS/effect / makeProtocolWorker

Function makeProtocolWorker

packages/rpc/src/RpcClient.ts:1079–1224  ·  view source on GitHub ↗
(
  options: {
    readonly size: number
    readonly concurrency?: number | undefined
    readonly targetUtilization?: number | undefined
  } | {
    readonly minSize: number
    readonly maxSize: number
    readonly concurrency?: number | undefined
    readonly targetUtilization?: number | undefined
    readonly timeToLive: Duration.DurationInput
  }
)

Source from the content-addressed store, hash-verified

1077 * @category protocol
1078 */
1079export const makeProtocolWorker = (
1080 options: {
1081 readonly size: number
1082 readonly concurrency?: number | undefined
1083 readonly targetUtilization?: number | undefined
1084 } | {
1085 readonly minSize: number
1086 readonly maxSize: number
1087 readonly concurrency?: number | undefined
1088 readonly targetUtilization?: number | undefined
1089 readonly timeToLive: Duration.DurationInput
1090 }
1091): Effect.Effect<
1092 Protocol["Type"],
1093 WorkerError,
1094 Scope.Scope | Worker.PlatformWorker | Worker.Spawner
1095> =>
1096 Protocol.make(Effect.fnUntraced(function*(writeResponse) {
1097 const worker = yield* Worker.PlatformWorker
1098 const scope = yield* Effect.scope
1099 let workerId = 0
1100 const initialMessage = yield* Effect.serviceOption(RpcWorker.InitialMessage)
1101
1102 const entries = new Map<string, {
1103 readonly worker: Worker.BackingWorker<FromClientEncoded | RpcWorker.InitialMessage.Encoded, FromServerEncoded>
1104 readonly latch: Effect.Latch
1105 }>()
1106
1107 const acquire = Effect.gen(function*() {
1108 const id = workerId++
1109 const backing = yield* worker.spawn<FromClientEncoded | RpcWorker.InitialMessage.Encoded, FromServerEncoded>(id)
1110 const readyLatch = yield* Effect.makeLatch()
1111
1112 yield* backing.run((message) => {
1113 if (message[0] === 0) {
1114 return readyLatch.open
1115 }
1116 const response = message[1]
1117 if (response._tag === "Exit") {
1118 const entry = entries.get(response.requestId)
1119 if (entry) {
1120 entries.delete(response.requestId)
1121 entry.latch.unsafeOpen()
1122 return writeResponse(response)
1123 }
1124 } else if (response._tag === "Defect") {
1125 for (const [requestId, entry] of entries) {
1126 entries.delete(requestId)
1127 entry.latch.unsafeOpen()
1128 }
1129 return writeResponse(response)
1130 }
1131 return writeResponse(response)
1132 }).pipe(
1133 Effect.tapErrorCause((cause) =>
1134 writeResponse({
1135 _tag: "ClientProtocolError",
1136 error: new RpcClientError({

Callers 1

layerProtocolWorkerFunction · 0.85

Calls 9

unsafeOpenMethod · 0.80
syncMethod · 0.80
clearMethod · 0.80
makeMethod · 0.65
pipeMethod · 0.65
getMethod · 0.65
addFinalizerMethod · 0.65
runMethod · 0.45
valuesMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…