MCPcopy Index your code
hub / github.com/anomalyco/opencode / makeRecordingSocket

Function makeRecordingSocket

packages/http-recorder/src/socket.ts:119–200  ·  view source on GitHub ↗
(
  upstream: Socket.Socket,
  cassette: CassetteService.Interface,
  name: string,
  request: WebSocketRequest,
  options: WebSocketRecorderOptions,
  redactor: Redactor,
)

Source from the content-addressed store, hash-verified

117}
118
119const makeRecordingSocket = (
120 upstream: Socket.Socket,
121 cassette: CassetteService.Interface,
122 name: string,
123 request: WebSocketRequest,
124 options: WebSocketRecorderOptions,
125 redactor: Redactor,
126) =>
127 Effect.gen(function* () {
128 const active = yield* Ref.make<ActiveRecording | undefined>(undefined)
129 const writeLock = yield* Semaphore.make(1)
130
131 return Socket.make({
132 runRaw: (handler, runOptions) =>
133 Effect.gen(function* () {
134 const state: ActiveRecording = {
135 events: [],
136 eventLock: yield* Semaphore.make(1),
137 accepting: yield* Ref.make(true),
138 opened: false,
139 valid: true,
140 }
141 const occupied = yield* Ref.modify(active, (current) => [current !== undefined, current ?? state])
142 if (occupied) return yield* Effect.die("Concurrent runs of a recorded WebSocket are not supported")
143 yield* upstream
144 .runRaw(
145 (message) => {
146 if (!Ref.getUnsafe(state.accepting)) throw new Error("WebSocket received a frame after closing")
147 state.events.push(redactEvent(encodeEvent("server", message), redactor))
148 return handler(message)
149 },
150 {
151 ...runOptions,
152 onOpen: Effect.gen(function* () {
153 state.opened = true
154 if (runOptions?.onOpen) yield* runOptions.onOpen
155 }),
156 },
157 )
158 .pipe(
159 Effect.onExit((exit) =>
160 writeLock.withPermit(
161 state.eventLock.withPermit(
162 Effect.gen(function* () {
163 yield* Ref.set(state.accepting, false)
164 yield* Ref.set(active, undefined)
165 if (!Exit.isSuccess(exit) || !state.opened || !state.valid) return
166 yield* cassette
167 .append(
168 name,
169 {
170 transport: "websocket",
171 open: openSnapshot(request, redactor),
172 events: [...state.events],
173 },
174 options.metadata,
175 )
176 .pipe(Effect.orDie)

Callers 1

recordingLayerFunction · 0.85

Calls 11

encodeEventFunction · 0.85
pushMethod · 0.80
syncMethod · 0.80
redactEventFunction · 0.70
openSnapshotFunction · 0.70
getMethod · 0.65
handlerFunction · 0.50
writeFunction · 0.50
makeMethod · 0.45
setMethod · 0.45
appendMethod · 0.45

Tested by

no test coverage detected