MCPcopy Index your code
hub / github.com/simstudioai/sim / startAbortPoller

Function startAbortPoller

apps/sim/lib/copilot/request/session/abort.ts:326–387  ·  view source on GitHub ↗
(
  streamId: string,
  abortController: AbortController,
  options?: { pollMs?: number; requestId?: string; chatId?: string }
)

Source from the content-addressed store, hash-verified

324const pollingStreams = new Set<string>()
325
326export function startAbortPoller(
327 streamId: string,
328 abortController: AbortController,
329 options?: { pollMs?: number; requestId?: string; chatId?: string }
330): ReturnType<typeof setInterval> {
331 const pollMs = options?.pollMs ?? DEFAULT_ABORT_POLL_MS
332 const requestId = options?.requestId
333 const chatId = options?.chatId
334
335 let lastHeartbeatAt = Date.now()
336 let heartbeatOwnershipLost = false
337
338 return setInterval(() => {
339 if (pollingStreams.has(streamId)) return
340 pollingStreams.add(streamId)
341
342 void (async () => {
343 try {
344 const shouldAbort = await hasAbortMarker(streamId)
345 if (shouldAbort && !abortController.signal.aborted) {
346 abortController.abort(AbortReason.RedisPoller)
347 await clearAbortMarker(streamId)
348 }
349 } catch (error) {
350 logger.warn('Failed to poll stream abort marker', {
351 streamId,
352 ...(requestId ? { requestId } : {}),
353 error: toError(error).message,
354 })
355 } finally {
356 pollingStreams.delete(streamId)
357 }
358
359 if (!chatId || heartbeatOwnershipLost) return
360 if (Date.now() - lastHeartbeatAt < CHAT_STREAM_LOCK_HEARTBEAT_INTERVAL_MS) return
361
362 try {
363 const owned = await extendLock(
364 getChatStreamLockKey(chatId),
365 streamId,
366 CHAT_STREAM_LOCK_TTL_SECONDS
367 )
368 lastHeartbeatAt = Date.now()
369 if (!owned) {
370 heartbeatOwnershipLost = true
371 logger.warn('Lost ownership of chat stream lock — stopping heartbeat', {
372 chatId,
373 streamId,
374 ...(requestId ? { requestId } : {}),
375 })
376 }
377 } catch (error) {
378 logger.warn('Failed to extend chat stream lock TTL', {
379 chatId,
380 streamId,
381 ...(requestId ? { requestId } : {}),
382 error: toError(error).message,
383 })

Callers 2

startFunction · 0.90
abort.test.tsFile · 0.90

Calls 9

hasAbortMarkerFunction · 0.90
clearAbortMarkerFunction · 0.90
toErrorFunction · 0.90
extendLockFunction · 0.90
getChatStreamLockKeyFunction · 0.85
abortMethod · 0.65
warnMethod · 0.65
deleteMethod · 0.65
addMethod · 0.45

Tested by

no test coverage detected