( workspaceIds: string[], retentionDate: Date, label: string )
| 196 | } |
| 197 | |
| 198 | async function cleanupLegacyLargeExecutionValues( |
| 199 | workspaceIds: string[], |
| 200 | retentionDate: Date, |
| 201 | label: string |
| 202 | ): Promise<LargeValueCleanupStats> { |
| 203 | const stats: LargeValueCleanupStats = { |
| 204 | largeValuesTotal: 0, |
| 205 | largeValuesDeleted: 0, |
| 206 | largeValuesDeleteFailed: 0, |
| 207 | } |
| 208 | if (workspaceIds.length === 0) return stats |
| 209 | |
| 210 | const legacyRetentionDate = new Date( |
| 211 | retentionDate.getTime() - LEGACY_LARGE_VALUE_CLEANUP_GRACE_HOURS * 60 * 60 * 1000 |
| 212 | ) |
| 213 | const workspaceChunks = chunkArray(workspaceIds, 50) |
| 214 | let attempted = 0 |
| 215 | |
| 216 | for (const chunkIds of workspaceChunks) { |
| 217 | while (attempted < LARGE_VALUE_CLEANUP_TOTAL_KEY_LIMIT) { |
| 218 | const limit = Math.min( |
| 219 | LARGE_VALUE_CLEANUP_BATCH_SIZE, |
| 220 | LARGE_VALUE_CLEANUP_TOTAL_KEY_LIMIT - attempted |
| 221 | ) |
| 222 | const rows = await db |
| 223 | .select({ key: workspaceFiles.key }) |
| 224 | .from(workspaceFiles) |
| 225 | .where( |
| 226 | and( |
| 227 | inArray(workspaceFiles.workspaceId, chunkIds), |
| 228 | eq(workspaceFiles.context, 'execution'), |
| 229 | isNull(workspaceFiles.deletedAt), |
| 230 | lt(workspaceFiles.uploadedAt, legacyRetentionDate), |
| 231 | sql`${workspaceFiles.key} LIKE 'execution/%/%/%/large-value-lv_%.json'`, |
| 232 | sql`NOT EXISTS ( |
| 233 | SELECT 1 |
| 234 | FROM ${executionLargeValues} AS registered_value |
| 235 | WHERE registered_value.key = ${workspaceFiles.key} |
| 236 | )`, |
| 237 | sql`NOT EXISTS ( |
| 238 | SELECT 1 |
| 239 | FROM ${executionLargeValueReferences} AS ref |
| 240 | WHERE ref.key = ${workspaceFiles.key} |
| 241 | AND ( |
| 242 | ( |
| 243 | ref.source = 'execution_log' |
| 244 | AND EXISTS ( |
| 245 | SELECT 1 |
| 246 | FROM ${workflowExecutionLogs} AS ref_wel |
| 247 | WHERE ref_wel.execution_id = ref.execution_id |
| 248 | ) |
| 249 | ) |
| 250 | OR ( |
| 251 | ref.source = 'paused_snapshot' |
| 252 | AND EXISTS ( |
| 253 | SELECT 1 |
| 254 | FROM ${pausedExecutions} AS ref_pe |
| 255 | WHERE ref_pe.execution_id = ref.execution_id |
no test coverage detected