(ctx context.Context, w io.Writer, node *data.Node)
| 113 | } |
| 114 | |
| 115 | func (d *Dumper) writeNode(ctx context.Context, w io.Writer, node *data.Node) error { |
| 116 | wg, ctx := errgroup.WithContext(ctx) |
| 117 | limit := int(d.repo.Connections()) |
| 118 | wg.SetLimit(1 + limit) // +1 for the writer. |
| 119 | blobs := make(chan (<-chan []byte), limit) |
| 120 | |
| 121 | // Writer. |
| 122 | wg.Go(func() error { |
| 123 | for ch := range blobs { |
| 124 | select { |
| 125 | case <-ctx.Done(): |
| 126 | return ctx.Err() |
| 127 | case blob := <-ch: |
| 128 | if _, err := w.Write(blob); err != nil { |
| 129 | return err |
| 130 | } |
| 131 | } |
| 132 | } |
| 133 | return nil |
| 134 | }) |
| 135 | |
| 136 | // Start short-lived goroutines to load blobs. |
| 137 | loop: |
| 138 | for _, id := range node.Content { |
| 139 | // This needs to be buffered, so that loaders can quit |
| 140 | // without waiting for the writer. |
| 141 | ch := make(chan []byte, 1) |
| 142 | |
| 143 | wg.Go(func() error { |
| 144 | blob, err := d.cache.GetOrCompute(id, func() ([]byte, error) { |
| 145 | return d.repo.LoadBlob(ctx, restic.DataBlob, id, nil) |
| 146 | }) |
| 147 | |
| 148 | if err == nil { |
| 149 | ch <- blob |
| 150 | } |
| 151 | return err |
| 152 | }) |
| 153 | |
| 154 | select { |
| 155 | case blobs <- ch: |
| 156 | case <-ctx.Done(): |
| 157 | break loop |
| 158 | } |
| 159 | } |
| 160 | |
| 161 | close(blobs) |
| 162 | return wg.Wait() |
| 163 | } |
no test coverage detected