RunExport drains the clickhouse_outbox table into ClickHouse in batches. Each batch is its own restate.Run so a crash mid-drain only replays the last incomplete batch. Within a batch: 1. SELECT outbox rows WHERE deleted_at IS NULL ORDER BY pk LIMIT batchLimit FOR UPDATE SKIP LOCKED 2. Decode the JS
( ctx restate.ObjectContext, _ *hydrav1.RunExportRequest, )
| 49 | // duplicate write into a noop. Marked rows stay in the table for ops to |
| 50 | // re-queue (clear deleted_at) and as an audit trail of what was exported. |
| 51 | func (s *Service) RunExport( |
| 52 | ctx restate.ObjectContext, |
| 53 | _ *hydrav1.RunExportRequest, |
| 54 | ) (*hydrav1.RunExportResponse, error) { |
| 55 | logger.Info("running audit log export") |
| 56 | start := time.Now() |
| 57 | |
| 58 | var totalExported int32 |
| 59 | for batchNum := 0; ; batchNum++ { |
| 60 | result, err := restate.Run(ctx, func(rc restate.RunContext) (batchResult, error) { |
| 61 | return s.exportBatch(rc) |
| 62 | }, restate.WithName(fmt.Sprintf("batch-%d", batchNum))) |
| 63 | if err != nil { |
| 64 | return nil, fmt.Errorf("batch %d: %w", batchNum, err) |
| 65 | } |
| 66 | |
| 67 | totalExported += result.EventsExported |
| 68 | |
| 69 | if result.EventsExported < batchLimit { |
| 70 | break |
| 71 | } |
| 72 | } |
| 73 | |
| 74 | logger.Info("audit log export complete", |
| 75 | "events_exported", totalExported, |
| 76 | "elapsed", time.Since(start), |
| 77 | ) |
| 78 | |
| 79 | if _, err := restate.Run(ctx, func(rc restate.RunContext) (restate.Void, error) { |
| 80 | return restate.Void{}, s.heartbeat.Ping(rc) |
| 81 | }, restate.WithName("send heartbeat")); err != nil { |
| 82 | return nil, fmt.Errorf("send heartbeat: %w", err) |
| 83 | } |
| 84 | |
| 85 | return &hydrav1.RunExportResponse{ |
| 86 | EventsExported: totalExported, |
| 87 | }, nil |
| 88 | } |
| 89 | |
| 90 | // exportBatch reads one batch of outbox rows, writes them to ClickHouse, |
| 91 | // then marks them deleted. The whole batch runs inside a single MySQL |
nothing calls this directly
no test coverage detected