MCPcopy Index your code
hub / github.com/cloudquery/cloudquery / WriteTableBatch

Method WriteTableBatch

plugins/destination/bigquery/client/write.go:57–98  ·  view source on GitHub ↗
(ctx context.Context, name string, msgs message.WriteInserts)

Source from the content-addressed store, hash-verified

55}
56
57func (c *Client) WriteTableBatch(ctx context.Context, name string, msgs message.WriteInserts) error {
58 inserter := c.client.DatasetInProject(c.spec.ProjectID, c.spec.DatasetID).Table(name).Inserter()
59 inserter.IgnoreUnknownValues = true
60 inserter.SkipInvalidRows = false
61 batch := make([]*item, 0)
62 for _, msg := range msgs {
63 rec := msg.Record
64 for i := 0; i < int(rec.NumRows()); i++ {
65 saver := &item{Cols: make(map[string]bigquery.Value, rec.NumCols())}
66 for n, col := range rec.Columns() {
67 if col.IsValid(i) {
68 // save some bandwidth by not sending nil values
69 saver.Cols[rec.ColumnName(n)] = getValueForBigQuery(col, i)
70 }
71 }
72 batch = append(batch, saver)
73 }
74 }
75 // flush final rows
76 timeoutCtx, cancel := context.WithTimeout(ctx, writeTimeout)
77 defer cancel()
78
79 for err := inserter.Put(timeoutCtx, batch); err != nil; err = inserter.Put(timeoutCtx, batch) {
80 // check if bigquery error is 404 (table does not exist yet), then wait a bit and retry until it does exist
81 if isAPINotFoundError(err) {
82 // retry
83 c.logger.Info().Str("table", name).Msg("Table does not exist yet, waiting for it to be created before retrying write")
84 time.Sleep(1 * time.Second)
85 continue
86 }
87 if isEntityTooLargeError(err) {
88 batchData := c.serializeBatchForError(batch)
89 if batchData == "" {
90 return fmt.Errorf("batch too big to be inserted into BigQuery table %s. See limitations here https://cloud.google.com/bigquery/quotas#streaming_inserts", name)
91 }
92 return fmt.Errorf("batch too big to be inserted into BigQuery table %s. See limitations here https://cloud.google.com/bigquery/quotas#streaming_inserts. Batch data: %s", name, batchData)
93 }
94 return fmt.Errorf("failed to put item into BigQuery table %s: %w", name, err)
95 }
96
97 return c.embeddingsClient.WriteTableBatch(ctx, name, msgs)
98}
99
100func getValueForBigQuery(col arrow.Array, i int) any {
101 switch v := col.(type) {

Callers

nothing calls this directly

Calls 6

getValueForBigQueryFunction · 0.85
isAPINotFoundErrorFunction · 0.85
isEntityTooLargeErrorFunction · 0.85
ErrorfMethod · 0.80
WriteTableBatchMethod · 0.65

Tested by

no test coverage detected