(config *cmn.Config)
| 435 | } |
| 436 | |
| 437 | func (reb *Reb) beginStreams(config *cmn.Config) { |
| 438 | debug.Assert(reb.stages.stage.Load() == rebStageInit) |
| 439 | |
| 440 | xreb := reb.xctn() |
| 441 | reb.dm.SetXact(xreb) |
| 442 | reb.dm.Open() |
| 443 | pushArgs := bundle.Args{ |
| 444 | Net: reb.dm.NetC(), |
| 445 | Trname: trnamePsh, |
| 446 | Multiplier: config.Rebalance.SbundleMult, |
| 447 | Extra: &transport.Extra{SenderID: xreb.ID()}, |
| 448 | } |
| 449 | reb.pushes = bundle.NewStreams(reb.t.Sowner(), reb.t.Snode(), transport.NewIntraDataClient(), pushArgs) |
| 450 | |
| 451 | reb.laterx.Store(false) |
| 452 | reb.inQueue.Store(0) |
| 453 | } |
| 454 | |
| 455 | func (reb *Reb) abortStreams() { |
| 456 | reb.dm.Abort() |
no test coverage detected