MCPcopy
hub / github.com/openimsdk/open-im-server / seqRedisToMongo

Function seqRedisToMongo

tools/seq/internal/main.go:260–311  ·  view source on GitHub ↗
(ctx context.Context, rdb redis.UniversalClient, getSeq func(ctx context.Context, id string) (int64, error), setSeq func(ctx context.Context, id string, seq int64) error, prefix string, delAfter time.Duration, count *int64)

Source from the content-addressed store, hash-verified

258}
259
260func seqRedisToMongo(ctx context.Context, rdb redis.UniversalClient, getSeq func(ctx context.Context, id string) (int64, error), setSeq func(ctx context.Context, id string, seq int64) error, prefix string, delAfter time.Duration, count *int64) error {
261 var (
262 cursor uint64
263 keys []string
264 err error
265 )
266 for {
267 keys, cursor, err = rdb.Scan(ctx, cursor, prefix+"*", batchSize).Result()
268 if err != nil {
269 return err
270 }
271 if len(keys) > 0 {
272 for _, key := range keys {
273 seqStr, err := rdb.Get(ctx, key).Result()
274 if err != nil {
275 return fmt.Errorf("redis get %s failed %w", key, err)
276 }
277 seq, err := strconv.Atoi(seqStr)
278 if err != nil {
279 return fmt.Errorf("invalid %s seq %s", key, seqStr)
280 }
281 if seq < 0 {
282 return fmt.Errorf("invalid %s seq %s", key, seqStr)
283 }
284 id := strings.TrimPrefix(key, prefix)
285 redisSeq := int64(seq)
286 mongoSeq, err := getSeq(ctx, id)
287 if err != nil {
288 return fmt.Errorf("get mongo seq %s failed %w", key, err)
289 }
290 if mongoSeq < redisSeq {
291 if err := setSeq(ctx, id, redisSeq); err != nil {
292 return fmt.Errorf("set mongo seq %s failed %w", key, err)
293 }
294 }
295 if delAfter > 0 {
296 if err := rdb.Expire(ctx, key, delAfter).Err(); err != nil {
297 return fmt.Errorf("redis expire key %s failed %w", key, err)
298 }
299 } else {
300 if err := rdb.Del(ctx, key).Err(); err != nil {
301 return fmt.Errorf("redis del key %s failed %w", key, err)
302 }
303 }
304 atomic.AddInt64(count, 1)
305 }
306 }
307 if cursor == 0 {
308 return nil
309 }
310 }
311}
312
313func CheckVersion(coll *mongo.Collection, key string, currentVersion int) (converted bool, err error) {
314 type VersionTable struct {

Callers 1

MainFunction · 0.85

Calls 3

ErrMethod · 0.80
GetMethod · 0.65
DelMethod · 0.65

Tested by

no test coverage detected