MCPcopy
hub / github.com/NVIDIA/aistore / sendFromDisk

Method sendFromDisk

reb/ec.go:97–158  ·  view source on GitHub ↗

Sends local CT along with EC metadata to default target. The CT is on a local drive and not loaded into SGL. Just read and send.

(ct *cluster.CT, meta *ec.Metadata, target *cluster.Snode, workFQN ...string)

Source from the content-addressed store, hash-verified

95// Sends local CT along with EC metadata to default target.
96// The CT is on a local drive and not loaded into SGL. Just read and send.
97func (reb *Reb) sendFromDisk(ct *cluster.CT, meta *ec.Metadata, target *cluster.Snode, workFQN ...string) (err error) {
98 var (
99 lom *cluster.LOM
100 roc cos.ReadOpenCloser
101 fqn = ct.FQN()
102 action = uint32(rebActRebCT)
103 )
104 debug.Assert(meta != nil)
105 if len(workFQN) != 0 {
106 fqn = workFQN[0]
107 action = rebActMoveCT
108 }
109 // TODO: unify acquiring a reader for LOM and CT
110 if ct.ContentType() == fs.ObjectType {
111 lom = cluster.AllocLOM(ct.ObjectName())
112 if err = lom.InitBck(ct.Bck().Bucket()); err != nil {
113 cluster.FreeLOM(lom)
114 return
115 }
116 lom.Lock(false)
117 if err = lom.Load(false /*cache it*/, true /*locked*/); err != nil {
118 lom.Unlock(false)
119 cluster.FreeLOM(lom)
120 return
121 }
122 } else {
123 lom = nil // sending slice; TODO: rlock
124 }
125
126 // open
127 if lom != nil {
128 defer cluster.FreeLOM(lom)
129 roc, err = lom.NewDeferROC()
130 } else {
131 roc, err = cos.NewFileHandle(fqn)
132 }
133 if err != nil {
134 return
135 }
136
137 // transmit
138 ntfn := stageNtfn{daemonID: reb.t.SID(), stage: rebStageTraverse, rebID: reb.rebID.Load(), md: meta, action: action}
139 o := transport.AllocSend()
140 o.Hdr = transport.ObjHdr{ObjName: ct.ObjectName(), ObjAttrs: cmn.ObjAttrs{Size: meta.Size}}
141 o.Hdr.Bck.Copy(ct.Bck().Bucket())
142 if lom != nil {
143 o.Hdr.ObjAttrs.CopyFrom(lom.ObjAttrs())
144 }
145 if meta.SliceID != 0 {
146 o.Hdr.ObjAttrs.Size = ec.SliceSize(meta.Size, meta.Data)
147 }
148 reb.onAir.Inc()
149 o.Hdr.Opaque = ntfn.NewPack(rebMsgEC)
150 o.Callback = reb.transportECCB
151 if err = reb.dm.Send(o, roc, target); err != nil {
152 err = fmt.Errorf("failed to send slices to nodes [%s..]: %v", target.ID(), err)
153 return
154 }

Callers 2

receiveCTMethod · 0.95
walkECMethod · 0.95

Calls 15

NewPackMethod · 0.95
xctnMethod · 0.95
AssertFunction · 0.92
AllocLOMFunction · 0.92
FreeLOMFunction · 0.92
NewFileHandleFunction · 0.92
AllocSendFunction · 0.92
SliceSizeFunction · 0.92
FQNMethod · 0.80
ContentTypeMethod · 0.80
InitBckMethod · 0.80
NewDeferROCMethod · 0.80

Tested by

no test coverage detected