( req *model.RecordsToStreamRequest[model.RecordItems], numericTruncator model.StreamNumericTruncator, )
| 14 | ) |
| 15 | |
| 16 | func RecordsToRawTableStream( |
| 17 | req *model.RecordsToStreamRequest[model.RecordItems], numericTruncator model.StreamNumericTruncator, |
| 18 | ) (*model.QRecordStream, error) { |
| 19 | recordStream := model.NewQRecordStream(1024) |
| 20 | recordStream.SetSchema(types.QRecordSchema{ |
| 21 | Fields: []types.QField{ |
| 22 | { |
| 23 | Name: "_peerdb_uid", |
| 24 | Type: types.QValueKindString, |
| 25 | Nullable: false, |
| 26 | }, |
| 27 | { |
| 28 | Name: "_peerdb_timestamp", |
| 29 | Type: types.QValueKindInt64, |
| 30 | Nullable: false, |
| 31 | }, |
| 32 | { |
| 33 | Name: "_peerdb_destination_table_name", |
| 34 | Type: types.QValueKindString, |
| 35 | Nullable: false, |
| 36 | }, |
| 37 | { |
| 38 | Name: "_peerdb_data", |
| 39 | Type: types.QValueKindString, |
| 40 | Nullable: false, |
| 41 | }, |
| 42 | { |
| 43 | Name: "_peerdb_record_type", |
| 44 | Type: types.QValueKindInt64, |
| 45 | Nullable: true, |
| 46 | }, |
| 47 | { |
| 48 | Name: "_peerdb_match_data", |
| 49 | Type: types.QValueKindString, |
| 50 | Nullable: true, |
| 51 | }, |
| 52 | { |
| 53 | Name: "_peerdb_batch_id", |
| 54 | Type: types.QValueKindInt64, |
| 55 | Nullable: true, |
| 56 | }, |
| 57 | { |
| 58 | Name: "_peerdb_unchanged_toast_columns", |
| 59 | Type: types.QValueKindString, |
| 60 | Nullable: true, |
| 61 | }, |
| 62 | }, |
| 63 | }) |
| 64 | |
| 65 | go func() { |
| 66 | for record := range req.GetRecords() { |
| 67 | record.PopulateCountMap(req.TableMapping) |
| 68 | qRecord, err := recordToQRecordOrError( |
| 69 | req.BatchID, record, req.TargetDWH, req.UnboundedNumericAsString, numericTruncator, |
| 70 | ) |
| 71 | if err != nil { |
| 72 | recordStream.Close(err) |
| 73 | return |
nothing calls this directly
no test coverage detected