| 318 | } |
| 319 | |
| 320 | func (m *Manager) createShard(s *extract.Shard) (err error) { |
| 321 | var ( |
| 322 | loadContent = m.dsorter.loadContent() |
| 323 | metrics = m.Metrics.Creation |
| 324 | |
| 325 | // object related variables |
| 326 | shardName = s.Name |
| 327 | |
| 328 | errCh = make(chan error, 2) |
| 329 | ) |
| 330 | // |
| 331 | // TODO: use cluster.AllocLOM, review `t.PutObject` below |
| 332 | // |
| 333 | lom := &cluster.LOM{ObjName: shardName} |
| 334 | if err = lom.InitBck(&m.rs.OutputBck); err != nil { |
| 335 | return |
| 336 | } |
| 337 | lom.SetAtimeUnix(time.Now().UnixNano()) |
| 338 | |
| 339 | if m.aborted() { |
| 340 | return newDSortAbortedError(m.ManagerUUID) |
| 341 | } |
| 342 | |
| 343 | if err := m.dsorter.preShardCreation(s.Name, lom.MpathInfo()); err != nil { |
| 344 | return err |
| 345 | } |
| 346 | defer m.dsorter.postShardCreation(lom.MpathInfo()) |
| 347 | |
| 348 | // TODO: check capacity *prior* to starting |
| 349 | if cs := fs.GetCapStatus(); cs.Err != nil { |
| 350 | return cs.Err |
| 351 | } |
| 352 | |
| 353 | beforeCreation := time.Now() |
| 354 | |
| 355 | var ( |
| 356 | wg = &sync.WaitGroup{} |
| 357 | r, w = io.Pipe() |
| 358 | n int64 |
| 359 | ) |
| 360 | wg.Add(1) |
| 361 | go func() { |
| 362 | var err error |
| 363 | if !m.rs.DryRun { |
| 364 | params := cluster.AllocPutObjParams() |
| 365 | { |
| 366 | params.WorkTag = "dsort" |
| 367 | // NOTE: we cannot allow `PutObject` to close original reader |
| 368 | // on error as it can cause panic when `CreateShard` writes data. |
| 369 | params.Reader = io.NopCloser(r) |
| 370 | params.Cksum = nil |
| 371 | params.Atime = beforeCreation |
| 372 | |
| 373 | // TODO: params.Xact = ? |
| 374 | } |
| 375 | err = m.ctx.t.PutObject(lom, params) |
| 376 | cluster.FreePutObjParams(params) |
| 377 | if err == nil { |