init initializes all necessary fields. PRECONDITION: `m.mu` must be locked.
(rs *ParsedRequestSpec)
| 170 | // |
| 171 | // PRECONDITION: `m.mu` must be locked. |
| 172 | func (m *Manager) init(rs *ParsedRequestSpec) error { |
| 173 | debug.AssertMutexLocked(&m.mu) |
| 174 | |
| 175 | m.ctx = ctx |
| 176 | m.smap = m.ctx.smapOwner.Get() |
| 177 | |
| 178 | targetCount := m.smap.CountActiveTargets() |
| 179 | |
| 180 | m.rs = rs |
| 181 | m.Metrics = newMetrics(rs.Description, rs.ExtendedMetrics) |
| 182 | m.startShardCreation = make(chan struct{}, 1) |
| 183 | |
| 184 | m.ctx.smapOwner.Listeners().Reg(m) |
| 185 | |
| 186 | if err := m.setDSorter(); err != nil { |
| 187 | return err |
| 188 | } |
| 189 | |
| 190 | if err := m.dsorter.init(); err != nil { |
| 191 | return err |
| 192 | } |
| 193 | |
| 194 | // Set extract creator depending on extension provided by the user |
| 195 | if err := m.setExtractCreator(); err != nil { |
| 196 | return err |
| 197 | } |
| 198 | |
| 199 | // NOTE: Total size of the records metadata can sometimes be large |
| 200 | // and so this is why we need such a long timeout. |
| 201 | config := cmn.GCO.Get() |
| 202 | m.client = cmn.NewClient(cmn.TransportArgs{ |
| 203 | DialTimeout: 5 * time.Minute, |
| 204 | Timeout: 30 * time.Minute, |
| 205 | UseHTTPS: config.Net.HTTP.UseHTTPS, |
| 206 | SkipVerify: config.Net.HTTP.SkipVerify, |
| 207 | }) |
| 208 | |
| 209 | m.fileExtension = rs.Extension |
| 210 | m.received.ch = make(chan int32, 10) |
| 211 | |
| 212 | // By default we want avg compression ratio to be equal to 1 |
| 213 | m.compression.compressed = *atomic.NewInt64(1) |
| 214 | m.compression.uncompressed = *atomic.NewInt64(1) |
| 215 | |
| 216 | // Concurrency |
| 217 | |
| 218 | // Number of goroutines should be larger than number of concurrency limit |
| 219 | // but it should not be: |
| 220 | // * too small - we don't want to artificially bottleneck the phases. |
| 221 | // * too large - we don't want too much goroutines in the system, it can cause |
| 222 | // too much overhead on context switching and managing the goroutines. |
| 223 | // Also for large workloads goroutines can take a lot of memory. |
| 224 | // |
| 225 | // Coefficient for extraction should be larger and depends on target count |
| 226 | // because we will skip a lot shards (which do not belong to us). |
| 227 | m.extractionPhase.adjuster = newConcAdjuster( |
| 228 | rs.ExtractConcMaxLimit, |
| 229 | 2*targetCount, /*goroutineLimitCoef*/ |
nothing calls this directly
no test coverage detected