| 77 | func (sb *Streams) UsePDU() bool { return sb.extra.UsePDU() } |
| 78 | |
| 79 | func NewStreams(sowner cluster.Sowner, lsnode *cluster.Snode, cl transport.Client, sbArgs Args) (sb *Streams) { |
| 80 | if sbArgs.Net == "" { |
| 81 | sbArgs.Net = cmn.NetIntraData |
| 82 | } else { |
| 83 | debug.Assertf(cmn.NetworkIsKnown(sbArgs.Net), "Unknown network %s, expecting one of: %v", |
| 84 | sbArgs.Net, cmn.KnownNetworks) |
| 85 | } |
| 86 | debug.Assert(sbArgs.Ntype == cluster.Targets || sbArgs.Ntype == cluster.Proxies || sbArgs.Ntype == cluster.AllNodes) |
| 87 | listeners := sowner.Listeners() |
| 88 | sb = &Streams{ |
| 89 | sowner: sowner, |
| 90 | smap: &cluster.Smap{}, |
| 91 | smaplock: &sync.Mutex{}, |
| 92 | lsnode: lsnode, |
| 93 | client: cl, |
| 94 | network: sbArgs.Net, |
| 95 | trname: sbArgs.Trname, |
| 96 | rxNodeType: sbArgs.Ntype, |
| 97 | multiplier: sbArgs.Multiplier, |
| 98 | manualResync: sbArgs.ManualResync, |
| 99 | } |
| 100 | if sbArgs.Extra != nil { |
| 101 | sb.extra = *sbArgs.Extra |
| 102 | } |
| 103 | if sb.multiplier == 0 { |
| 104 | sb.multiplier = 1 |
| 105 | } |
| 106 | if sb.extra.Config == nil { |
| 107 | sb.extra.Config = cmn.GCO.Get() |
| 108 | } |
| 109 | if !sb.extra.Compressed() { |
| 110 | sb.lid = fmt.Sprintf("sb[%s-%s-%s]", sb.lsnode.ID(), sb.network, sb.trname) |
| 111 | } else { |
| 112 | sb.lid = fmt.Sprintf("sb[%s-%s-%s[%s]]", sb.lsnode.ID(), sb.network, sb.trname, |
| 113 | cos.B2S(int64(sb.extra.Config.Transport.LZ4BlockMaxSize), 0)) |
| 114 | } |
| 115 | |
| 116 | // update streams when Smap changes |
| 117 | sb.Resync() |
| 118 | |
| 119 | // register this stream-bundle as Smap listener |
| 120 | if !sb.manualResync { |
| 121 | listeners.Reg(sb) |
| 122 | } |
| 123 | return |
| 124 | } |
| 125 | |
| 126 | // Close closes all contained streams and unregisters the bundle from Smap listeners; |
| 127 | // graceful=true blocks until all pending objects get completed (for "completion", see transport/README.md) |