pollReplicaClient fetches new LTX files from the replica client and updates the page index & the current position.
(ctx context.Context)
| 2498 | // pollReplicaClient fetches new LTX files from the replica client and updates |
| 2499 | // the page index & the current position. |
| 2500 | func (f *VFSFile) pollReplicaClient(ctx context.Context) error { |
| 2501 | pos := f.Pos() |
| 2502 | f.logger.Debug("polling replica client", "txid", pos.TXID.String()) |
| 2503 | |
| 2504 | combined := make(map[uint32]ltx.PageIndexElem) |
| 2505 | |
| 2506 | f.mu.Lock() |
| 2507 | baseCommit := f.commit |
| 2508 | maxTXID1Snapshot := f.maxTXID1 |
| 2509 | f.mu.Unlock() |
| 2510 | |
| 2511 | newCommit := baseCommit |
| 2512 | replaceIndex := false |
| 2513 | |
| 2514 | maxTXID0, idx0, commit0, replace0, err := f.pollLevel(ctx, 0, pos.TXID, baseCommit) |
| 2515 | if err != nil { |
| 2516 | return fmt.Errorf("poll L0: %w", err) |
| 2517 | } |
| 2518 | if replace0 { |
| 2519 | replaceIndex = true |
| 2520 | baseCommit = commit0 |
| 2521 | newCommit = commit0 |
| 2522 | combined = idx0 |
| 2523 | } else { |
| 2524 | if len(idx0) > 0 { |
| 2525 | baseCommit = commit0 |
| 2526 | } |
| 2527 | for k, v := range idx0 { |
| 2528 | combined[k] = v |
| 2529 | } |
| 2530 | if commit0 > newCommit { |
| 2531 | newCommit = commit0 |
| 2532 | } |
| 2533 | } |
| 2534 | |
| 2535 | maxTXID1, idx1, commit1, replace1, err := f.pollLevel(ctx, 1, maxTXID1Snapshot, baseCommit) |
| 2536 | if err != nil { |
| 2537 | return fmt.Errorf("poll L1: %w", err) |
| 2538 | } |
| 2539 | if replace1 { |
| 2540 | replaceIndex = true |
| 2541 | baseCommit = commit1 |
| 2542 | newCommit = commit1 |
| 2543 | combined = idx1 |
| 2544 | } else { |
| 2545 | for k, v := range idx1 { |
| 2546 | combined[k] = v |
| 2547 | } |
| 2548 | if commit1 > newCommit { |
| 2549 | newCommit = commit1 |
| 2550 | } |
| 2551 | } |
| 2552 | |
| 2553 | // Send updates to a pending list if there are active readers. |
| 2554 | f.mu.Lock() |
| 2555 | defer f.mu.Unlock() |
| 2556 | |
| 2557 | if f.targetTime != nil { |