( ctx context.Context, ls *lua.LState, lfn *lua.LFunction, stream *model.CDCStream[model.RecordItems], onErr context.CancelCauseFunc, )
| 39 | } |
| 40 | |
| 41 | func AttachToCdcStream( |
| 42 | ctx context.Context, |
| 43 | ls *lua.LState, |
| 44 | lfn *lua.LFunction, |
| 45 | stream *model.CDCStream[model.RecordItems], |
| 46 | onErr context.CancelCauseFunc, |
| 47 | ) *model.CDCStream[model.RecordItems] { |
| 48 | outstream := model.NewCDCStream[model.RecordItems](0) |
| 49 | |
| 50 | handleErr := func(err error) { |
| 51 | onErr(err) |
| 52 | <-ctx.Done() |
| 53 | for range stream.GetRecords() { |
| 54 | // still read records to make sure input closes first |
| 55 | } |
| 56 | } |
| 57 | |
| 58 | go func() { |
| 59 | if stream.WaitAndCheckEmpty() { |
| 60 | outstream.SignalAsEmpty() |
| 61 | <-stream.GetRecords() // needed because empty signal comes before Close |
| 62 | } else { |
| 63 | outstream.SignalAsNotEmpty() |
| 64 | for record := range stream.GetRecords() { |
| 65 | ls.Push(lfn) |
| 66 | ls.Push(LuaRecord.New(ls, record)) |
| 67 | if err := ls.PCall(1, 0, nil); err != nil { |
| 68 | handleErr(err) |
| 69 | break |
| 70 | } |
| 71 | err := outstream.AddRecord(ctx, record) |
| 72 | if err != nil { |
| 73 | handleErr(err) |
| 74 | break |
| 75 | } |
| 76 | } |
| 77 | } |
| 78 | outstream.SchemaDeltas = stream.SchemaDeltas |
| 79 | lastCP := stream.GetLastCheckpoint() |
| 80 | outstream.UpdateLatestCheckpointID(lastCP.ID) |
| 81 | outstream.UpdateLatestCheckpointText(lastCP.Text) |
| 82 | outstream.Close() |
| 83 | }() |
| 84 | return outstream |
| 85 | } |
nothing calls this directly
no test coverage detected