(
options: {
readonly size: number
readonly concurrency?: number | undefined
readonly targetUtilization?: number | undefined
} | {
readonly minSize: number
readonly maxSize: number
readonly concurrency?: number | undefined
readonly targetUtilization?: number | undefined
readonly timeToLive: Duration.DurationInput
}
)
| 1077 | * @category protocol |
| 1078 | */ |
| 1079 | export const makeProtocolWorker = ( |
| 1080 | options: { |
| 1081 | readonly size: number |
| 1082 | readonly concurrency?: number | undefined |
| 1083 | readonly targetUtilization?: number | undefined |
| 1084 | } | { |
| 1085 | readonly minSize: number |
| 1086 | readonly maxSize: number |
| 1087 | readonly concurrency?: number | undefined |
| 1088 | readonly targetUtilization?: number | undefined |
| 1089 | readonly timeToLive: Duration.DurationInput |
| 1090 | } |
| 1091 | ): Effect.Effect< |
| 1092 | Protocol["Type"], |
| 1093 | WorkerError, |
| 1094 | Scope.Scope | Worker.PlatformWorker | Worker.Spawner |
| 1095 | > => |
| 1096 | Protocol.make(Effect.fnUntraced(function*(writeResponse) { |
| 1097 | const worker = yield* Worker.PlatformWorker |
| 1098 | const scope = yield* Effect.scope |
| 1099 | let workerId = 0 |
| 1100 | const initialMessage = yield* Effect.serviceOption(RpcWorker.InitialMessage) |
| 1101 | |
| 1102 | const entries = new Map<string, { |
| 1103 | readonly worker: Worker.BackingWorker<FromClientEncoded | RpcWorker.InitialMessage.Encoded, FromServerEncoded> |
| 1104 | readonly latch: Effect.Latch |
| 1105 | }>() |
| 1106 | |
| 1107 | const acquire = Effect.gen(function*() { |
| 1108 | const id = workerId++ |
| 1109 | const backing = yield* worker.spawn<FromClientEncoded | RpcWorker.InitialMessage.Encoded, FromServerEncoded>(id) |
| 1110 | const readyLatch = yield* Effect.makeLatch() |
| 1111 | |
| 1112 | yield* backing.run((message) => { |
| 1113 | if (message[0] === 0) { |
| 1114 | return readyLatch.open |
| 1115 | } |
| 1116 | const response = message[1] |
| 1117 | if (response._tag === "Exit") { |
| 1118 | const entry = entries.get(response.requestId) |
| 1119 | if (entry) { |
| 1120 | entries.delete(response.requestId) |
| 1121 | entry.latch.unsafeOpen() |
| 1122 | return writeResponse(response) |
| 1123 | } |
| 1124 | } else if (response._tag === "Defect") { |
| 1125 | for (const [requestId, entry] of entries) { |
| 1126 | entries.delete(requestId) |
| 1127 | entry.latch.unsafeOpen() |
| 1128 | } |
| 1129 | return writeResponse(response) |
| 1130 | } |
| 1131 | return writeResponse(response) |
| 1132 | }).pipe( |
| 1133 | Effect.tapErrorCause((cause) => |
| 1134 | writeResponse({ |
| 1135 | _tag: "ClientProtocolError", |
| 1136 | error: new RpcClientError({ |
no test coverage detected
searching dependent graphs…