For the prepared writeItems, execute insert writeItems.
(ctx context.Context, txn PgxExec, writeItems [][]interface{})
| 489 | |
| 490 | // For the prepared writeItems, execute insert writeItems. |
| 491 | func executeWriteTuples(ctx context.Context, txn PgxExec, writeItems [][]interface{}) error { |
| 492 | stbl := sq.StatementBuilder.PlaceholderFormat(sq.Dollar) |
| 493 | |
| 494 | for start, totalWrites := 0, len(writeItems); start < totalWrites; start += storage.DefaultMaxTuplesPerWrite { |
| 495 | end := start + storage.DefaultMaxTuplesPerWrite |
| 496 | if end > totalWrites { |
| 497 | end = totalWrites |
| 498 | } |
| 499 | |
| 500 | writesBatch := writeItems[start:end] |
| 501 | |
| 502 | insertBuilder := stbl. |
| 503 | Insert("tuple"). |
| 504 | Columns( |
| 505 | "store", |
| 506 | "object_type", |
| 507 | "object_id", |
| 508 | "relation", |
| 509 | "_user", |
| 510 | "user_type", |
| 511 | "condition_name", |
| 512 | "condition_context", |
| 513 | "ulid", |
| 514 | "inserted_at", |
| 515 | ) |
| 516 | |
| 517 | for _, item := range writesBatch { |
| 518 | insertBuilder = insertBuilder.Values(item...) |
| 519 | } |
| 520 | |
| 521 | stmt, args, err := insertBuilder.ToSql() |
| 522 | if err != nil { |
| 523 | // Should never happen because we craft the insert statement |
| 524 | return HandleSQLError(err) |
| 525 | } |
| 526 | |
| 527 | _, err = txn.Exec(ctx, stmt, args...) |
| 528 | if err != nil { |
| 529 | dberr := HandleSQLError(err) |
| 530 | if errors.Is(dberr, storage.ErrCollision) { |
| 531 | // ErrCollision is returned on duplicate write (constraint violation), meaning we hit a race condition - someone else inserted the same row(s). |
| 532 | return storage.ErrWriteConflictOnInsert |
| 533 | } |
| 534 | return dberr |
| 535 | } |
| 536 | } |
| 537 | return nil |
| 538 | } |
| 539 | |
| 540 | func executeInsertChanges(ctx context.Context, txn PgxExec, changeLogItems [][]interface{}) error { |
| 541 | stbl := sq.StatementBuilder.PlaceholderFormat(sq.Dollar) |
searching dependent graphs…