MCPcopy Index your code
hub / github.com/cloudquery/cloudquery / WriteTableBatch

Method WriteTableBatch

plugins/destination/neo4j/client/write.go:13–61  ·  view source on GitHub ↗
(ctx context.Context, tableName string, msgs message.WriteInserts)

Source from the content-addressed store, hash-verified

11)
12
13func (c *Client) WriteTableBatch(ctx context.Context, tableName string, msgs message.WriteInserts) error {
14 if len(msgs) == 0 {
15 return nil
16 }
17
18 table, err := schema.NewTableFromArrowSchema(msgs[0].Record.Schema())
19 if err != nil {
20 return err
21 }
22
23 session := c.Session(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
24 defer session.Close(ctx)
25
26 rows := make([]map[string]any, 0, len(msgs))
27 for i := range msgs {
28 rows = append(rows, transformValues(msgs[i].Record)...)
29 }
30
31 var sb strings.Builder
32 pks := table.PrimaryKeys()
33 if len(pks) == 0 {
34 sb.WriteString("UNWIND $rows AS row CREATE (t:")
35 sb.WriteString(tableName)
36 sb.WriteString(") SET t = row")
37 } else {
38 sb.WriteString("UNWIND $rows AS row MERGE (t:")
39 sb.WriteString(tableName)
40 sb.WriteString(" {")
41 for i, column := range pks {
42 if i != 0 {
43 sb.WriteString(", ")
44 }
45 sb.WriteString(column)
46 sb.WriteString(": row.")
47 sb.WriteString(column)
48 }
49 sb.WriteString("}) SET t = row")
50 }
51 stmt := sb.String()
52 c.logger.Debug().Str("stmt", stmt).Any("rows", rows).Msg("Executing statement")
53 if _, err := session.ExecuteWrite(ctx, func(tx neo4j.ManagedTransaction) (any, error) {
54 _, err := tx.Run(ctx, stmt, map[string]any{"rows": rows})
55 return nil, err
56 }); err != nil {
57 return err
58 }
59
60 return session.Close(ctx)
61}
62
63func (c *Client) Write(ctx context.Context, msgs <-chan message.WriteMessage) error {
64 if err := c.writer.Write(ctx, msgs); err != nil {

Callers

nothing calls this directly

Calls 5

SessionMethod · 0.95
transformValuesFunction · 0.85
RunMethod · 0.80
CloseMethod · 0.65
StringMethod · 0.45

Tested by

no test coverage detected