( workspaceIds: string[], retentionDate: Date, label: string )
| 363 | } |
| 364 | |
| 365 | async function cleanupWorkflowExecutionLogs( |
| 366 | workspaceIds: string[], |
| 367 | retentionDate: Date, |
| 368 | label: string |
| 369 | ): Promise<TableCleanupResult & FileDeleteStats> { |
| 370 | const fileStats: FileDeleteStats = { |
| 371 | filesTotal: 0, |
| 372 | filesDeleted: 0, |
| 373 | filesDeleteFailed: 0, |
| 374 | } |
| 375 | |
| 376 | const dbStats = await chunkedBatchDelete({ |
| 377 | tableDef: workflowExecutionLogs, |
| 378 | workspaceIds, |
| 379 | tableName: `${label}/workflow_execution_logs`, |
| 380 | selectChunk: (chunkIds, limit) => |
| 381 | db |
| 382 | .select({ |
| 383 | id: workflowExecutionLogs.id, |
| 384 | files: workflowExecutionLogs.files, |
| 385 | }) |
| 386 | .from(workflowExecutionLogs) |
| 387 | .leftJoin( |
| 388 | pausedExecutions, |
| 389 | eq(pausedExecutions.executionId, workflowExecutionLogs.executionId) |
| 390 | ) |
| 391 | .where( |
| 392 | and( |
| 393 | inArray(workflowExecutionLogs.workspaceId, chunkIds), |
| 394 | lt(workflowExecutionLogs.startedAt, retentionDate), |
| 395 | or( |
| 396 | isNull(pausedExecutions.status), |
| 397 | notInArray(pausedExecutions.status, [...LIVE_PAUSED_REFERENCE_STATUSES]) |
| 398 | ) |
| 399 | ) |
| 400 | ) |
| 401 | .limit(limit), |
| 402 | onBatch: async (rows) => { |
| 403 | for (const row of rows) { |
| 404 | await deleteExecutionFiles(row.files, fileStats) |
| 405 | } |
| 406 | }, |
| 407 | batchSize: WORKFLOW_LOG_CLEANUP_BATCH_SIZE, |
| 408 | maxBatches: WORKFLOW_LOG_CLEANUP_MAX_BATCHES, |
| 409 | totalRowLimit: WORKFLOW_LOG_CLEANUP_ROW_LIMIT, |
| 410 | }) |
| 411 | |
| 412 | return { ...dbStats, ...fileStats } |
| 413 | } |
| 414 | |
| 415 | async function cleanupFreePlanOrphanedSnapshots(retentionHours: number): Promise<void> { |
| 416 | try { |
no test coverage detected