MCPcopy
hub / github.com/unkeyed/unkey / RunExport

Method RunExport

svc/ctrl/worker/auditlogexport/run_export_handler.go:51–88  ·  view source on GitHub ↗

RunExport drains the clickhouse_outbox table into ClickHouse in batches. Each batch is its own restate.Run so a crash mid-drain only replays the last incomplete batch. Within a batch: 1. SELECT outbox rows WHERE deleted_at IS NULL ORDER BY pk LIMIT batchLimit FOR UPDATE SKIP LOCKED 2. Decode the JS

(
	ctx restate.ObjectContext,
	_ *hydrav1.RunExportRequest,
)

Source from the content-addressed store, hash-verified

49// duplicate write into a noop. Marked rows stay in the table for ops to
50// re-queue (clear deleted_at) and as an audit trail of what was exported.
51func (s *Service) RunExport(
52 ctx restate.ObjectContext,
53 _ *hydrav1.RunExportRequest,
54) (*hydrav1.RunExportResponse, error) {
55 logger.Info("running audit log export")
56 start := time.Now()
57
58 var totalExported int32
59 for batchNum := 0; ; batchNum++ {
60 result, err := restate.Run(ctx, func(rc restate.RunContext) (batchResult, error) {
61 return s.exportBatch(rc)
62 }, restate.WithName(fmt.Sprintf("batch-%d", batchNum)))
63 if err != nil {
64 return nil, fmt.Errorf("batch %d: %w", batchNum, err)
65 }
66
67 totalExported += result.EventsExported
68
69 if result.EventsExported < batchLimit {
70 break
71 }
72 }
73
74 logger.Info("audit log export complete",
75 "events_exported", totalExported,
76 "elapsed", time.Since(start),
77 )
78
79 if _, err := restate.Run(ctx, func(rc restate.RunContext) (restate.Void, error) {
80 return restate.Void{}, s.heartbeat.Ping(rc)
81 }, restate.WithName("send heartbeat")); err != nil {
82 return nil, fmt.Errorf("send heartbeat: %w", err)
83 }
84
85 return &hydrav1.RunExportResponse{
86 EventsExported: totalExported,
87 }, nil
88}
89
90// exportBatch reads one batch of outbox rows, writes them to ClickHouse,
91// then marks them deleted. The whole batch runs inside a single MySQL

Callers

nothing calls this directly

Calls 4

exportBatchMethod · 0.95
NowMethod · 0.65
RunMethod · 0.65
PingMethod · 0.65

Tested by

no test coverage detected