MCPcopy
hub / github.com/Effect-TS/effect / makeProtocolSocket

Function makeProtocolSocket

packages/rpc/src/RpcClient.ts:939–1045  ·  view source on GitHub ↗
(options?: {
  readonly retryTransientErrors?: boolean | undefined
  readonly retrySchedule?: Schedule.Schedule<any, Socket.SocketError> | undefined
})

Source from the content-addressed store, hash-verified

937 * @category protocol
938 */
939export const makeProtocolSocket = (options?: {
940 readonly retryTransientErrors?: boolean | undefined
941 readonly retrySchedule?: Schedule.Schedule<any, Socket.SocketError> | undefined
942}): Effect.Effect<
943 Protocol["Type"],
944 never,
945 Scope.Scope | RpcSerialization.RpcSerialization | Socket.Socket
946> =>
947 Protocol.make(Effect.fnUntraced(function*(writeResponse) {
948 const socket = yield* Socket.Socket
949 const serialization = yield* RpcSerialization.RpcSerialization
950 const write = yield* socket.writer
951 let parser = serialization.unsafeMake()
952 const pinger = yield* makePinger(write(parser.encode(constPing)!))
953
954 let currentError: RpcClientError | undefined
955 const clearCurrentError = Effect.sync(() => {
956 currentError = undefined
957 })
958
959 yield* Effect.suspend(() => {
960 parser = serialization.unsafeMake()
961 pinger.reset()
962 return socket.runRaw((message) => {
963 try {
964 const responses = parser.decode(message) as Array<FromServerEncoded>
965 if (responses.length === 0) return
966 let i = 0
967 return Effect.whileLoop({
968 while: () => i < responses.length,
969 body: () => {
970 const response = responses[i++]
971 if (response._tag === "Pong") {
972 pinger.onPong()
973 }
974 return writeResponse(response)
975 },
976 step: constVoid
977 })
978 } catch (defect) {
979 return writeResponse({
980 _tag: "ClientProtocolError",
981 error: new RpcClientError({
982 reason: "Protocol",
983 message: "Error decoding message",
984 cause: Cause.fail(defect)
985 })
986 })
987 }
988 }, { onOpen: clearCurrentError }).pipe(
989 Effect.raceFirst(Effect.zipRight(
990 pinger.timeout,
991 Effect.fail(
992 new Socket.SocketGenericError({
993 reason: "OpenTimeout",
994 cause: new Error("ping timeout")
995 })
996 )

Callers 1

layerProtocolSocketFunction · 0.85

Calls 9

encodeMethod · 0.80
syncMethod · 0.80
decodeMethod · 0.80
writeFunction · 0.70
makeMethod · 0.65
pipeMethod · 0.65
resetMethod · 0.65
failMethod · 0.65
runRawMethod · 0.45

Tested by

no test coverage detected