| 937 | * @category protocol |
| 938 | */ |
| 939 | export const makeProtocolSocket = (options?: { |
| 940 | readonly retryTransientErrors?: boolean | undefined |
| 941 | readonly retrySchedule?: Schedule.Schedule<any, Socket.SocketError> | undefined |
| 942 | }): Effect.Effect< |
| 943 | Protocol["Type"], |
| 944 | never, |
| 945 | Scope.Scope | RpcSerialization.RpcSerialization | Socket.Socket |
| 946 | > => |
| 947 | Protocol.make(Effect.fnUntraced(function*(writeResponse) { |
| 948 | const socket = yield* Socket.Socket |
| 949 | const serialization = yield* RpcSerialization.RpcSerialization |
| 950 | const write = yield* socket.writer |
| 951 | let parser = serialization.unsafeMake() |
| 952 | const pinger = yield* makePinger(write(parser.encode(constPing)!)) |
| 953 | |
| 954 | let currentError: RpcClientError | undefined |
| 955 | const clearCurrentError = Effect.sync(() => { |
| 956 | currentError = undefined |
| 957 | }) |
| 958 | |
| 959 | yield* Effect.suspend(() => { |
| 960 | parser = serialization.unsafeMake() |
| 961 | pinger.reset() |
| 962 | return socket.runRaw((message) => { |
| 963 | try { |
| 964 | const responses = parser.decode(message) as Array<FromServerEncoded> |
| 965 | if (responses.length === 0) return |
| 966 | let i = 0 |
| 967 | return Effect.whileLoop({ |
| 968 | while: () => i < responses.length, |
| 969 | body: () => { |
| 970 | const response = responses[i++] |
| 971 | if (response._tag === "Pong") { |
| 972 | pinger.onPong() |
| 973 | } |
| 974 | return writeResponse(response) |
| 975 | }, |
| 976 | step: constVoid |
| 977 | }) |
| 978 | } catch (defect) { |
| 979 | return writeResponse({ |
| 980 | _tag: "ClientProtocolError", |
| 981 | error: new RpcClientError({ |
| 982 | reason: "Protocol", |
| 983 | message: "Error decoding message", |
| 984 | cause: Cause.fail(defect) |
| 985 | }) |
| 986 | }) |
| 987 | } |
| 988 | }, { onOpen: clearCurrentError }).pipe( |
| 989 | Effect.raceFirst(Effect.zipRight( |
| 990 | pinger.timeout, |
| 991 | Effect.fail( |
| 992 | new Socket.SocketGenericError({ |
| 993 | reason: "OpenTimeout", |
| 994 | cause: new Error("ping timeout") |
| 995 | }) |
| 996 | ) |