(ctx context.Context, tableName string, msgs message.WriteInserts)
| 11 | ) |
| 12 | |
| 13 | func (c *Client) WriteTableBatch(ctx context.Context, tableName string, msgs message.WriteInserts) error { |
| 14 | if len(msgs) == 0 { |
| 15 | return nil |
| 16 | } |
| 17 | |
| 18 | table, err := schema.NewTableFromArrowSchema(msgs[0].Record.Schema()) |
| 19 | if err != nil { |
| 20 | return err |
| 21 | } |
| 22 | |
| 23 | session := c.Session(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite}) |
| 24 | defer session.Close(ctx) |
| 25 | |
| 26 | rows := make([]map[string]any, 0, len(msgs)) |
| 27 | for i := range msgs { |
| 28 | rows = append(rows, transformValues(msgs[i].Record)...) |
| 29 | } |
| 30 | |
| 31 | var sb strings.Builder |
| 32 | pks := table.PrimaryKeys() |
| 33 | if len(pks) == 0 { |
| 34 | sb.WriteString("UNWIND $rows AS row CREATE (t:") |
| 35 | sb.WriteString(tableName) |
| 36 | sb.WriteString(") SET t = row") |
| 37 | } else { |
| 38 | sb.WriteString("UNWIND $rows AS row MERGE (t:") |
| 39 | sb.WriteString(tableName) |
| 40 | sb.WriteString(" {") |
| 41 | for i, column := range pks { |
| 42 | if i != 0 { |
| 43 | sb.WriteString(", ") |
| 44 | } |
| 45 | sb.WriteString(column) |
| 46 | sb.WriteString(": row.") |
| 47 | sb.WriteString(column) |
| 48 | } |
| 49 | sb.WriteString("}) SET t = row") |
| 50 | } |
| 51 | stmt := sb.String() |
| 52 | c.logger.Debug().Str("stmt", stmt).Any("rows", rows).Msg("Executing statement") |
| 53 | if _, err := session.ExecuteWrite(ctx, func(tx neo4j.ManagedTransaction) (any, error) { |
| 54 | _, err := tx.Run(ctx, stmt, map[string]any{"rows": rows}) |
| 55 | return nil, err |
| 56 | }); err != nil { |
| 57 | return err |
| 58 | } |
| 59 | |
| 60 | return session.Close(ctx) |
| 61 | } |
| 62 | |
| 63 | func (c *Client) Write(ctx context.Context, msgs <-chan message.WriteMessage) error { |
| 64 | if err := c.writer.Write(ctx, msgs); err != nil { |
nothing calls this directly
no test coverage detected