( upstream: Socket.Socket, cassette: CassetteService.Interface, name: string, request: WebSocketRequest, options: WebSocketRecorderOptions, redactor: Redactor, )
| 117 | } |
| 118 | |
| 119 | const makeRecordingSocket = ( |
| 120 | upstream: Socket.Socket, |
| 121 | cassette: CassetteService.Interface, |
| 122 | name: string, |
| 123 | request: WebSocketRequest, |
| 124 | options: WebSocketRecorderOptions, |
| 125 | redactor: Redactor, |
| 126 | ) => |
| 127 | Effect.gen(function* () { |
| 128 | const active = yield* Ref.make<ActiveRecording | undefined>(undefined) |
| 129 | const writeLock = yield* Semaphore.make(1) |
| 130 | |
| 131 | return Socket.make({ |
| 132 | runRaw: (handler, runOptions) => |
| 133 | Effect.gen(function* () { |
| 134 | const state: ActiveRecording = { |
| 135 | events: [], |
| 136 | eventLock: yield* Semaphore.make(1), |
| 137 | accepting: yield* Ref.make(true), |
| 138 | opened: false, |
| 139 | valid: true, |
| 140 | } |
| 141 | const occupied = yield* Ref.modify(active, (current) => [current !== undefined, current ?? state]) |
| 142 | if (occupied) return yield* Effect.die("Concurrent runs of a recorded WebSocket are not supported") |
| 143 | yield* upstream |
| 144 | .runRaw( |
| 145 | (message) => { |
| 146 | if (!Ref.getUnsafe(state.accepting)) throw new Error("WebSocket received a frame after closing") |
| 147 | state.events.push(redactEvent(encodeEvent("server", message), redactor)) |
| 148 | return handler(message) |
| 149 | }, |
| 150 | { |
| 151 | ...runOptions, |
| 152 | onOpen: Effect.gen(function* () { |
| 153 | state.opened = true |
| 154 | if (runOptions?.onOpen) yield* runOptions.onOpen |
| 155 | }), |
| 156 | }, |
| 157 | ) |
| 158 | .pipe( |
| 159 | Effect.onExit((exit) => |
| 160 | writeLock.withPermit( |
| 161 | state.eventLock.withPermit( |
| 162 | Effect.gen(function* () { |
| 163 | yield* Ref.set(state.accepting, false) |
| 164 | yield* Ref.set(active, undefined) |
| 165 | if (!Exit.isSuccess(exit) || !state.opened || !state.valid) return |
| 166 | yield* cassette |
| 167 | .append( |
| 168 | name, |
| 169 | { |
| 170 | transport: "websocket", |
| 171 | open: openSnapshot(request, redactor), |
| 172 | events: [...state.events], |
| 173 | }, |
| 174 | options.metadata, |
| 175 | ) |
| 176 | .pipe(Effect.orDie) |
no test coverage detected