({
// defaults to 1 day in minutes
intervalMinutes = 1440,
workspaceIds,
startDate: startDateOverride,
endDate: endDateOverride,
forceFullBackfill = false,
// defaults to 10000 rows per batch within a time window
limit = 10000,
dryRun = false,
}: {
intervalMinutes?: number;
workspaceIds?: string[];
startDate?: string;
endDate?: string;
forceFullBackfill?: boolean;
limit?: number;
dryRun?: boolean;
})
| 572 | } |
| 573 | |
| 574 | export async function backfillInternalEvents({ |
| 575 | // defaults to 1 day in minutes |
| 576 | intervalMinutes = 1440, |
| 577 | workspaceIds, |
| 578 | startDate: startDateOverride, |
| 579 | endDate: endDateOverride, |
| 580 | forceFullBackfill = false, |
| 581 | // defaults to 10000 rows per batch within a time window |
| 582 | limit = 10000, |
| 583 | dryRun = false, |
| 584 | }: { |
| 585 | intervalMinutes?: number; |
| 586 | workspaceIds?: string[]; |
| 587 | startDate?: string; |
| 588 | endDate?: string; |
| 589 | forceFullBackfill?: boolean; |
| 590 | limit?: number; |
| 591 | dryRun?: boolean; |
| 592 | }) { |
| 593 | logger().info( |
| 594 | dryRun |
| 595 | ? "Analyzing internal events backfill (dry run)" |
| 596 | : "Backfilling internal events", |
| 597 | ); |
| 598 | |
| 599 | // Determine start date |
| 600 | let startDate: Date; |
| 601 | if (startDateOverride) { |
| 602 | startDate = new Date(startDateOverride); |
| 603 | logger().info( |
| 604 | { startDate, override: startDateOverride }, |
| 605 | "Using manual start date override", |
| 606 | ); |
| 607 | } else if (forceFullBackfill) { |
| 608 | // Skip internal_events check and always use min from user_events_v2 |
| 609 | logger().info( |
| 610 | "Force full backfill enabled, skipping internal_events check", |
| 611 | ); |
| 612 | try { |
| 613 | const userEventsQb = new ClickHouseQueryBuilder(); |
| 614 | const userEventsWorkspaceFilter = workspaceIds |
| 615 | ? `AND workspace_id IN ${userEventsQb.addQueryValue(workspaceIds, "Array(String)")}` |
| 616 | : ""; |
| 617 | |
| 618 | const userEventsResult = await query({ |
| 619 | query: `SELECT min(processing_time) as min_time FROM user_events_v2 WHERE event_type = 'track' AND startsWith(event, 'DF') ${userEventsWorkspaceFilter}`, |
| 620 | query_params: userEventsQb.getQueries(), |
| 621 | clickhouse_settings: { wait_end_of_query: 1 }, |
| 622 | }); |
| 623 | |
| 624 | const minTimeResult = await userEventsResult.json<{ min_time: string }>(); |
| 625 | const minTime = minTimeResult[0]?.min_time; |
| 626 | |
| 627 | logger().debug( |
| 628 | { minTimeResult, minTime }, |
| 629 | "Raw min time result from user_events_v2 (force full backfill)", |
| 630 | ); |
| 631 |
no test coverage detected