MCPcopy
hub / github.com/PeerDB-io/peerdb / AttachToCdcStream

Function AttachToCdcStream

flow/pua/stream_adapter.go:41–85  ·  view source on GitHub ↗
(
	ctx context.Context,
	ls *lua.LState,
	lfn *lua.LFunction,
	stream *model.CDCStream[model.RecordItems],
	onErr context.CancelCauseFunc,
)

Source from the content-addressed store, hash-verified

39}
40
41func AttachToCdcStream(
42 ctx context.Context,
43 ls *lua.LState,
44 lfn *lua.LFunction,
45 stream *model.CDCStream[model.RecordItems],
46 onErr context.CancelCauseFunc,
47) *model.CDCStream[model.RecordItems] {
48 outstream := model.NewCDCStream[model.RecordItems](0)
49
50 handleErr := func(err error) {
51 onErr(err)
52 <-ctx.Done()
53 for range stream.GetRecords() {
54 // still read records to make sure input closes first
55 }
56 }
57
58 go func() {
59 if stream.WaitAndCheckEmpty() {
60 outstream.SignalAsEmpty()
61 <-stream.GetRecords() // needed because empty signal comes before Close
62 } else {
63 outstream.SignalAsNotEmpty()
64 for record := range stream.GetRecords() {
65 ls.Push(lfn)
66 ls.Push(LuaRecord.New(ls, record))
67 if err := ls.PCall(1, 0, nil); err != nil {
68 handleErr(err)
69 break
70 }
71 err := outstream.AddRecord(ctx, record)
72 if err != nil {
73 handleErr(err)
74 break
75 }
76 }
77 }
78 outstream.SchemaDeltas = stream.SchemaDeltas
79 lastCP := stream.GetLastCheckpoint()
80 outstream.UpdateLatestCheckpointID(lastCP.ID)
81 outstream.UpdateLatestCheckpointText(lastCP.Text)
82 outstream.Close()
83 }()
84 return outstream
85}

Callers

nothing calls this directly

Calls 9

WaitAndCheckEmptyMethod · 0.80
SignalAsEmptyMethod · 0.80
SignalAsNotEmptyMethod · 0.80
AddRecordMethod · 0.80
GetLastCheckpointMethod · 0.80
CloseMethod · 0.65
GetRecordsMethod · 0.45

Tested by

no test coverage detected