| 210 | type PullCallback func(msg PullMessage) error |
| 211 | |
| 212 | func PullModel(model string, id string, fn PullCallback) (*Pipeline, error) { |
| 213 | if client == nil { |
| 214 | return nil, fmt.Errorf("client not initialized") |
| 215 | } |
| 216 | mp, has := taskExecutor.GetModelPipeline(model) |
| 217 | if !has { |
| 218 | mp = newModelPipeline(taskExecutor.ctx, 100) |
| 219 | mp.pullFn = fn |
| 220 | taskExecutor.SetModelPipeline(model, mp) |
| 221 | } |
| 222 | p, err := mp.AddPipeline(id) |
| 223 | if err != nil { |
| 224 | return nil, err |
| 225 | } |
| 226 | if !has { |
| 227 | var status string |
| 228 | bars := make(map[string]*progress.Bar) |
| 229 | fn := func(resp api.ProgressResponse) error { |
| 230 | if resp.Digest != "" { |
| 231 | bar, ok := bars[resp.Digest] |
| 232 | if !ok { |
| 233 | bar = progress.NewBar(fmt.Sprintf("pulling %s...", resp.Digest[7:19]), resp.Total, resp.Completed) |
| 234 | bars[resp.Digest] = bar |
| 235 | } |
| 236 | bar.Set(resp.Completed) |
| 237 | |
| 238 | taskExecutor.msgQueue <- messageTask{ |
| 239 | message: PullMessage{ |
| 240 | Model: model, |
| 241 | Digest: resp.Digest, |
| 242 | Total: resp.Total, |
| 243 | Completed: resp.Completed, |
| 244 | Msg: bar.String(), |
| 245 | Status: resp.Status, |
| 246 | }, |
| 247 | } |
| 248 | } else if status != resp.Status { |
| 249 | taskExecutor.msgQueue <- messageTask{ |
| 250 | message: PullMessage{ |
| 251 | Model: model, |
| 252 | Digest: resp.Digest, |
| 253 | Total: resp.Total, |
| 254 | Completed: resp.Completed, |
| 255 | Msg: status, |
| 256 | Status: resp.Status, |
| 257 | }, |
| 258 | } |
| 259 | } |
| 260 | |
| 261 | return nil |
| 262 | } |
| 263 | go func() { |
| 264 | err = client.Pull(mp.ctx, &api.PullRequest{Model: model}, fn) |
| 265 | if err != nil { |
| 266 | taskExecutor.msgQueue <- messageTask{ |
| 267 | message: PullMessage{ |
| 268 | Model: model, |
| 269 | Status: "error", |