| 484 | } |
| 485 | |
| 486 | func (dp *DMapPipeline) execOnPartition(ctx context.Context, partID uint64) error { |
| 487 | rc, err := dp.dm.clusterClient.clientByPartID(partID) |
| 488 | if err != nil { |
| 489 | return err |
| 490 | } |
| 491 | // There is no need to protect dp.commands map and its content. |
| 492 | // It's already filled before running Exec, and it's now a read-only |
| 493 | // data structure |
| 494 | commands := dp.commands[partID] |
| 495 | pipe := rc.Pipeline() |
| 496 | |
| 497 | for _, cmd := range commands { |
| 498 | pipe.Do(ctx, cmd.Args()...) |
| 499 | } |
| 500 | |
| 501 | // Exec executes all previously queued commands using one |
| 502 | // client-server roundtrip. |
| 503 | // |
| 504 | // Exec always returns list of commands and error of the first failed |
| 505 | // command if any. |
| 506 | result, _ := pipe.Exec(ctx) |
| 507 | dp.mtx.Lock() |
| 508 | dp.result[partID] = result |
| 509 | dp.mtx.Unlock() |
| 510 | return nil |
| 511 | } |
| 512 | |
| 513 | // Exec executes all queued commands using one client-server roundtrip per partition. |
| 514 | func (dp *DMapPipeline) Exec(ctx context.Context) error { |