New creates an XidMap. zero conn must be valid for UID allocations to happen. Optionally, a badger.DB can be provided to persist the xid to uid allocations. This would add latency to the assignment operations. XidMap creates the temporary buffers inside dir directory. The caller must ensure that the
(opts XidMapOptions)
| 88 | // assignment operations. XidMap creates the temporary buffers inside dir directory. The caller must |
| 89 | // ensure that the dir exists. |
| 90 | func New(opts XidMapOptions) *XidMap { |
| 91 | numShards := 32 |
| 92 | xm := &XidMap{ |
| 93 | newRanges: make(chan *pb.AssignedIds, numShards), |
| 94 | shards: make([]*shard, numShards), |
| 95 | kvChan: make(chan []kv, 64), |
| 96 | dg: opts.DgClient, |
| 97 | } |
| 98 | for i := range xm.shards { |
| 99 | xm.shards[i] = &shard{ |
| 100 | tree: z.NewTree("XidMap"), |
| 101 | } |
| 102 | } |
| 103 | |
| 104 | if opts.DB != nil { |
| 105 | // If DB is provided, let's load up all the xid -> uid mappings in memory. |
| 106 | xm.writer = opts.DB.NewWriteBatch() |
| 107 | |
| 108 | for range 16 { |
| 109 | xm.wg.Add(1) |
| 110 | go xm.dbWriter() |
| 111 | } |
| 112 | |
| 113 | err := opts.DB.View(func(txn *badger.Txn) error { |
| 114 | var count int |
| 115 | opt := badger.DefaultIteratorOptions |
| 116 | opt.PrefetchValues = false |
| 117 | itr := txn.NewIterator(opt) |
| 118 | defer itr.Close() |
| 119 | for itr.Rewind(); itr.Valid(); itr.Next() { |
| 120 | item := itr.Item() |
| 121 | key := string(item.Key()) |
| 122 | sh := xm.shardFor(key) |
| 123 | err := item.Value(func(val []byte) error { |
| 124 | uid := binary.BigEndian.Uint64(val) |
| 125 | // No need to acquire a lock. This is all serial access. |
| 126 | sh.tree.Set(farm.Fingerprint64([]byte(key)), uid) |
| 127 | return nil |
| 128 | }) |
| 129 | if err != nil { |
| 130 | return err |
| 131 | } |
| 132 | count++ |
| 133 | } |
| 134 | glog.Infof("Loaded up %d xid to uid mappings", count) |
| 135 | return nil |
| 136 | }) |
| 137 | x.Check(err) |
| 138 | } |
| 139 | |
| 140 | if opts.UidAssigner != nil { |
| 141 | xm.zc = pb.NewZeroClient(opts.UidAssigner) |
| 142 | } |
| 143 | |
| 144 | go func() { |
| 145 | const initBackoff = 10 * time.Millisecond |
| 146 | const maxBackoff = 5 * time.Second |
| 147 | backoff := initBackoff |