MCPcopy
hub / github.com/simstudioai/sim / runDrain

Function runDrain

apps/sim/lib/data-drains/service.ts:34–227  ·  view source on GitHub ↗
(
  drainId: string,
  trigger: RunTrigger,
  options: { signal?: AbortSignal } = {}
)

Source from the content-addressed store, hash-verified

32 * dedupe on the per-row `id` field.
33 */
34export async function runDrain(
35 drainId: string,
36 trigger: RunTrigger,
37 options: { signal?: AbortSignal } = {}
38): Promise<RunDrainResult> {
39 const signal = options.signal ?? new AbortController().signal
40 const [drain] = await db.select().from(dataDrains).where(eq(dataDrains.id, drainId)).limit(1)
41 if (!drain) {
42 throw new Error(`Data drain not found: ${drainId}`)
43 }
44 if (!drain.enabled) {
45 return {
46 drainId,
47 runId: '',
48 status: 'skipped',
49 rowsExported: 0,
50 bytesWritten: 0,
51 cursorBefore: drain.cursor,
52 cursorAfter: drain.cursor,
53 locators: [],
54 }
55 }
56
57 const source = getSource(drain.source)
58 const destination = getDestination(drain.destinationType)
59
60 const runId = generateId()
61 const startedAt = new Date()
62 await db.insert(dataDrainRuns).values({
63 id: runId,
64 drainId,
65 status: 'running',
66 trigger,
67 startedAt,
68 cursorBefore: drain.cursor,
69 })
70
71 const cursorBefore = drain.cursor
72 let cursor: Cursor = drain.cursor
73 let rowsExported = 0
74 let bytesWritten = 0
75 let sequence = 0
76 const locators: string[] = []
77
78 /**
79 * Schema-parse and decrypt happen *after* the run row is created so failures
80 * in either (e.g. encryption-key rotation, schema drift across versions)
81 * surface as a `failed` run row in the UI rather than vanishing into the
82 * background-job logs while `lastRunAt` quietly advances.
83 */
84 let session: ReturnType<typeof destination.openSession> | null = null
85
86 try {
87 const config = destination.configSchema.parse(drain.destinationConfig)
88 const credentials = destination.credentialsSchema.parse(
89 await decryptCredentials(drain.destinationCredentials)
90 )
91 session = destination.openSession({ config, credentials })

Callers 2

service.test.tsFile · 0.90
run-data-drain.tsFile · 0.90

Calls 15

getSourceFunction · 0.90
getDestinationFunction · 0.90
generateIdFunction · 0.90
decryptCredentialsFunction · 0.90
toErrorFunction · 0.90
parseMethod · 0.80
openSessionMethod · 0.80
pagesMethod · 0.80
joinMethod · 0.80
serializeMethod · 0.80
deliverMethod · 0.80
cursorAfterMethod · 0.80

Tested by

no test coverage detected