MCPcopy
hub / github.com/dittofeed/dittofeed / backfillInternalEvents

Function backfillInternalEvents

packages/admin-cli/src/upgrades.ts:574–929  ·  view source on GitHub ↗
({
  // defaults to 1 day in minutes
  intervalMinutes = 1440,
  workspaceIds,
  startDate: startDateOverride,
  endDate: endDateOverride,
  forceFullBackfill = false,
  // defaults to 10000 rows per batch within a time window
  limit = 10000,
  dryRun = false,
}: {
  intervalMinutes?: number;
  workspaceIds?: string[];
  startDate?: string;
  endDate?: string;
  forceFullBackfill?: boolean;
  limit?: number;
  dryRun?: boolean;
})

Source from the content-addressed store, hash-verified

572}
573
574export async function backfillInternalEvents({
575 // defaults to 1 day in minutes
576 intervalMinutes = 1440,
577 workspaceIds,
578 startDate: startDateOverride,
579 endDate: endDateOverride,
580 forceFullBackfill = false,
581 // defaults to 10000 rows per batch within a time window
582 limit = 10000,
583 dryRun = false,
584}: {
585 intervalMinutes?: number;
586 workspaceIds?: string[];
587 startDate?: string;
588 endDate?: string;
589 forceFullBackfill?: boolean;
590 limit?: number;
591 dryRun?: boolean;
592}) {
593 logger().info(
594 dryRun
595 ? "Analyzing internal events backfill (dry run)"
596 : "Backfilling internal events",
597 );
598
599 // Determine start date
600 let startDate: Date;
601 if (startDateOverride) {
602 startDate = new Date(startDateOverride);
603 logger().info(
604 { startDate, override: startDateOverride },
605 "Using manual start date override",
606 );
607 } else if (forceFullBackfill) {
608 // Skip internal_events check and always use min from user_events_v2
609 logger().info(
610 "Force full backfill enabled, skipping internal_events check",
611 );
612 try {
613 const userEventsQb = new ClickHouseQueryBuilder();
614 const userEventsWorkspaceFilter = workspaceIds
615 ? `AND workspace_id IN ${userEventsQb.addQueryValue(workspaceIds, "Array(String)")}`
616 : "";
617
618 const userEventsResult = await query({
619 query: `SELECT min(processing_time) as min_time FROM user_events_v2 WHERE event_type = 'track' AND startsWith(event, 'DF') ${userEventsWorkspaceFilter}`,
620 query_params: userEventsQb.getQueries(),
621 clickhouse_settings: { wait_end_of_query: 1 },
622 });
623
624 const minTimeResult = await userEventsResult.json<{ min_time: string }>();
625 const minTime = minTimeResult[0]?.min_time;
626
627 logger().debug(
628 { minTimeResult, minTime },
629 "Raw min time result from user_events_v2 (force full backfill)",
630 );
631

Callers 2

createCommandsFunction · 0.90

Calls 6

addQueryValueMethod · 0.95
getQueriesMethod · 0.95
queryFunction · 0.90
commandFunction · 0.90
parseIntFunction · 0.90
loggerFunction · 0.85

Tested by

no test coverage detected