(chatId: string)
| 97 | } |
| 98 | |
| 99 | removeClient(chatId: string) { |
| 100 | const client = this.clients.get(chatId) |
| 101 | if (client) { |
| 102 | try { |
| 103 | const clientResponse = { |
| 104 | event: 'end', |
| 105 | data: '[DONE]' |
| 106 | } |
| 107 | client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') |
| 108 | client.response.end() |
| 109 | } catch { |
| 110 | // Client already disconnected, ignore write errors |
| 111 | } finally { |
| 112 | this.clients.delete(chatId) |
| 113 | } |
| 114 | } |
| 115 | |
| 116 | // Notify any observers that this execution finished, but keep their long-lived |
| 117 | // connections open for whatever they're observing next. UI transitions in_progress → done → idle. |
| 118 | const observerSet = this.observers.get(chatId) |
| 119 | if (observerSet && observerSet.size > 0) { |
| 120 | for (const observerId of Array.from(observerSet)) { |
| 121 | const observer = this.clients.get(observerId) |
| 122 | if (!observer) continue |
| 123 | try { |
| 124 | const payload = { event: 'executionEnd', data: { chatId } } |
| 125 | observer.response.write('message:\ndata:' + JSON.stringify(payload) + '\n\n') |
| 126 | } catch { |
| 127 | this.clients.delete(observerId) |
| 128 | } |
| 129 | } |
| 130 | this.observers.delete(chatId) |
| 131 | } |
| 132 | |
| 133 | // If the removed `chatId` was itself an observer, scrub it from every observer Set that |
| 134 | // still references it. Otherwise stale references would sit in memory until the next |
| 135 | // write to each observed chatId organically failed and lazily cleaned them up. |
| 136 | for (const [sourceId, observerIds] of this.observers) { |
| 137 | if (observerIds.delete(chatId) && observerIds.size === 0) { |
| 138 | this.observers.delete(sourceId) |
| 139 | } |
| 140 | } |
| 141 | } |
| 142 | |
| 143 | streamCustomEvent(chatId: string, eventType: string, data: any) { |
| 144 | const clientResponse = { |
no test coverage detected