CommitParallel(): sorts the operations in 8 subtree threads, executes those threads in parallel and combines them into the master tree
(unsortedOps map[uint64]valueOp)
| 191 | |
| 192 | // CommitParallel(): sorts the operations in 8 subtree threads, executes those threads in parallel and combines them into the master tree |
| 193 | func (s *SMT) CommitParallel(unsortedOps map[uint64]valueOp) (err lib.ErrorI) { |
| 194 | var wg sync.WaitGroup |
| 195 | errChan := make(chan lib.ErrorI, NumSubtrees) |
| 196 | // add 16 synthetic borders to the tree |
| 197 | cleanup, err := s.addSyntheticBorders() |
| 198 | if err != nil { |
| 199 | return err |
| 200 | } |
| 201 | // collect the roots for each group (000, 001, 010, 011...) |
| 202 | roots, err := s.getSubtreeRoots() |
| 203 | if err != nil { |
| 204 | return err |
| 205 | } |
| 206 | // sort operations grouping by prefix |
| 207 | groupedByPrefix, err := s.sortOperationsByPrefix(unsortedOps) |
| 208 | if err != nil { |
| 209 | return |
| 210 | } |
| 211 | // commit each group in parallel |
| 212 | for i := 0; i < 8; i++ { |
| 213 | wg.Add(1) |
| 214 | go func(index int) { |
| 215 | defer wg.Done() |
| 216 | // create subtree |
| 217 | subtree := s.createSubtree(roots[index], groupedByPrefix[index]) |
| 218 | subtree.reset() |
| 219 | // commit the subtree |
| 220 | if e := subtree.commit(true); e != nil { |
| 221 | errChan <- e |
| 222 | } |
| 223 | }(i) |
| 224 | } |
| 225 | // wait for all goroutines to finish |
| 226 | wg.Wait() |
| 227 | close(errChan) |
| 228 | // check if any errors occurred |
| 229 | for err = range errChan { |
| 230 | if err != nil { |
| 231 | return |
| 232 | } |
| 233 | } |
| 234 | return cleanup() |
| 235 | } |
| 236 | |
| 237 | // commit(): executes the deferred operations in order (left-to-right), |
| 238 | // minimizing the amount of traversals, IOPS, and hash operations |