MCPcopy
hub / github.com/PeerDB-io/peerdb / RecordsToRawTableStream

Function RecordsToRawTableStream

flow/connectors/utils/stream.go:16–82  ·  view source on GitHub ↗
(
	req *model.RecordsToStreamRequest[model.RecordItems], numericTruncator model.StreamNumericTruncator,
)

Source from the content-addressed store, hash-verified

14)
15
16func RecordsToRawTableStream(
17 req *model.RecordsToStreamRequest[model.RecordItems], numericTruncator model.StreamNumericTruncator,
18) (*model.QRecordStream, error) {
19 recordStream := model.NewQRecordStream(1024)
20 recordStream.SetSchema(types.QRecordSchema{
21 Fields: []types.QField{
22 {
23 Name: "_peerdb_uid",
24 Type: types.QValueKindString,
25 Nullable: false,
26 },
27 {
28 Name: "_peerdb_timestamp",
29 Type: types.QValueKindInt64,
30 Nullable: false,
31 },
32 {
33 Name: "_peerdb_destination_table_name",
34 Type: types.QValueKindString,
35 Nullable: false,
36 },
37 {
38 Name: "_peerdb_data",
39 Type: types.QValueKindString,
40 Nullable: false,
41 },
42 {
43 Name: "_peerdb_record_type",
44 Type: types.QValueKindInt64,
45 Nullable: true,
46 },
47 {
48 Name: "_peerdb_match_data",
49 Type: types.QValueKindString,
50 Nullable: true,
51 },
52 {
53 Name: "_peerdb_batch_id",
54 Type: types.QValueKindInt64,
55 Nullable: true,
56 },
57 {
58 Name: "_peerdb_unchanged_toast_columns",
59 Type: types.QValueKindString,
60 Nullable: true,
61 },
62 },
63 })
64
65 go func() {
66 for record := range req.GetRecords() {
67 record.PopulateCountMap(req.TableMapping)
68 qRecord, err := recordToQRecordOrError(
69 req.BatchID, record, req.TargetDWH, req.UnboundedNumericAsString, numericTruncator,
70 )
71 if err != nil {
72 recordStream.Close(err)
73 return

Callers

nothing calls this directly

Calls 5

SetSchemaMethod · 0.95
CloseMethod · 0.95
recordToQRecordOrErrorFunction · 0.85
PopulateCountMapMethod · 0.65
GetRecordsMethod · 0.45

Tested by

no test coverage detected