Sends local CT along with EC metadata to default target. The CT is on a local drive and not loaded into SGL. Just read and send.
(ct *cluster.CT, meta *ec.Metadata, target *cluster.Snode, workFQN ...string)
| 95 | // Sends local CT along with EC metadata to default target. |
| 96 | // The CT is on a local drive and not loaded into SGL. Just read and send. |
| 97 | func (reb *Reb) sendFromDisk(ct *cluster.CT, meta *ec.Metadata, target *cluster.Snode, workFQN ...string) (err error) { |
| 98 | var ( |
| 99 | lom *cluster.LOM |
| 100 | roc cos.ReadOpenCloser |
| 101 | fqn = ct.FQN() |
| 102 | action = uint32(rebActRebCT) |
| 103 | ) |
| 104 | debug.Assert(meta != nil) |
| 105 | if len(workFQN) != 0 { |
| 106 | fqn = workFQN[0] |
| 107 | action = rebActMoveCT |
| 108 | } |
| 109 | // TODO: unify acquiring a reader for LOM and CT |
| 110 | if ct.ContentType() == fs.ObjectType { |
| 111 | lom = cluster.AllocLOM(ct.ObjectName()) |
| 112 | if err = lom.InitBck(ct.Bck().Bucket()); err != nil { |
| 113 | cluster.FreeLOM(lom) |
| 114 | return |
| 115 | } |
| 116 | lom.Lock(false) |
| 117 | if err = lom.Load(false /*cache it*/, true /*locked*/); err != nil { |
| 118 | lom.Unlock(false) |
| 119 | cluster.FreeLOM(lom) |
| 120 | return |
| 121 | } |
| 122 | } else { |
| 123 | lom = nil // sending slice; TODO: rlock |
| 124 | } |
| 125 | |
| 126 | // open |
| 127 | if lom != nil { |
| 128 | defer cluster.FreeLOM(lom) |
| 129 | roc, err = lom.NewDeferROC() |
| 130 | } else { |
| 131 | roc, err = cos.NewFileHandle(fqn) |
| 132 | } |
| 133 | if err != nil { |
| 134 | return |
| 135 | } |
| 136 | |
| 137 | // transmit |
| 138 | ntfn := stageNtfn{daemonID: reb.t.SID(), stage: rebStageTraverse, rebID: reb.rebID.Load(), md: meta, action: action} |
| 139 | o := transport.AllocSend() |
| 140 | o.Hdr = transport.ObjHdr{ObjName: ct.ObjectName(), ObjAttrs: cmn.ObjAttrs{Size: meta.Size}} |
| 141 | o.Hdr.Bck.Copy(ct.Bck().Bucket()) |
| 142 | if lom != nil { |
| 143 | o.Hdr.ObjAttrs.CopyFrom(lom.ObjAttrs()) |
| 144 | } |
| 145 | if meta.SliceID != 0 { |
| 146 | o.Hdr.ObjAttrs.Size = ec.SliceSize(meta.Size, meta.Data) |
| 147 | } |
| 148 | reb.onAir.Inc() |
| 149 | o.Hdr.Opaque = ntfn.NewPack(rebMsgEC) |
| 150 | o.Callback = reb.transportECCB |
| 151 | if err = reb.dm.Send(o, roc, target); err != nil { |
| 152 | err = fmt.Errorf("failed to send slices to nodes [%s..]: %v", target.ID(), err) |
| 153 | return |
| 154 | } |
no test coverage detected