MCPcopy
hub / github.com/NVIDIA/aistore / NewStreams

Function NewStreams

transport/bundle/stream_bundle.go:79–124  ·  view source on GitHub ↗
(sowner cluster.Sowner, lsnode *cluster.Snode, cl transport.Client, sbArgs Args)

Source from the content-addressed store, hash-verified

77func (sb *Streams) UsePDU() bool { return sb.extra.UsePDU() }
78
79func NewStreams(sowner cluster.Sowner, lsnode *cluster.Snode, cl transport.Client, sbArgs Args) (sb *Streams) {
80 if sbArgs.Net == "" {
81 sbArgs.Net = cmn.NetIntraData
82 } else {
83 debug.Assertf(cmn.NetworkIsKnown(sbArgs.Net), "Unknown network %s, expecting one of: %v",
84 sbArgs.Net, cmn.KnownNetworks)
85 }
86 debug.Assert(sbArgs.Ntype == cluster.Targets || sbArgs.Ntype == cluster.Proxies || sbArgs.Ntype == cluster.AllNodes)
87 listeners := sowner.Listeners()
88 sb = &Streams{
89 sowner: sowner,
90 smap: &cluster.Smap{},
91 smaplock: &sync.Mutex{},
92 lsnode: lsnode,
93 client: cl,
94 network: sbArgs.Net,
95 trname: sbArgs.Trname,
96 rxNodeType: sbArgs.Ntype,
97 multiplier: sbArgs.Multiplier,
98 manualResync: sbArgs.ManualResync,
99 }
100 if sbArgs.Extra != nil {
101 sb.extra = *sbArgs.Extra
102 }
103 if sb.multiplier == 0 {
104 sb.multiplier = 1
105 }
106 if sb.extra.Config == nil {
107 sb.extra.Config = cmn.GCO.Get()
108 }
109 if !sb.extra.Compressed() {
110 sb.lid = fmt.Sprintf("sb[%s-%s-%s]", sb.lsnode.ID(), sb.network, sb.trname)
111 } else {
112 sb.lid = fmt.Sprintf("sb[%s-%s-%s[%s]]", sb.lsnode.ID(), sb.network, sb.trname,
113 cos.B2S(int64(sb.extra.Config.Transport.LZ4BlockMaxSize), 0))
114 }
115
116 // update streams when Smap changes
117 sb.Resync()
118
119 // register this stream-bundle as Smap listener
120 if !sb.manualResync {
121 listeners.Reg(sb)
122 }
123 return
124}
125
126// Close closes all contained streams and unregisters the bundle from Smap listeners;
127// graceful=true blocks until all pending objects get completed (for "completion", see transport/README.md)

Callers 7

testBundleFunction · 0.92
startMethod · 0.92
startMethod · 0.92
initStreamsMethod · 0.92
beginStreamsMethod · 0.92
initECBundlesMethod · 0.92
OpenMethod · 0.85

Calls 10

AssertfFunction · 0.92
NetworkIsKnownFunction · 0.92
AssertFunction · 0.92
B2SFunction · 0.92
CompressedMethod · 0.80
ResyncMethod · 0.80
ListenersMethod · 0.65
GetMethod · 0.65
IDMethod · 0.65
RegMethod · 0.65

Tested by 1

testBundleFunction · 0.74