| 258 | } |
| 259 | |
| 260 | func seqRedisToMongo(ctx context.Context, rdb redis.UniversalClient, getSeq func(ctx context.Context, id string) (int64, error), setSeq func(ctx context.Context, id string, seq int64) error, prefix string, delAfter time.Duration, count *int64) error { |
| 261 | var ( |
| 262 | cursor uint64 |
| 263 | keys []string |
| 264 | err error |
| 265 | ) |
| 266 | for { |
| 267 | keys, cursor, err = rdb.Scan(ctx, cursor, prefix+"*", batchSize).Result() |
| 268 | if err != nil { |
| 269 | return err |
| 270 | } |
| 271 | if len(keys) > 0 { |
| 272 | for _, key := range keys { |
| 273 | seqStr, err := rdb.Get(ctx, key).Result() |
| 274 | if err != nil { |
| 275 | return fmt.Errorf("redis get %s failed %w", key, err) |
| 276 | } |
| 277 | seq, err := strconv.Atoi(seqStr) |
| 278 | if err != nil { |
| 279 | return fmt.Errorf("invalid %s seq %s", key, seqStr) |
| 280 | } |
| 281 | if seq < 0 { |
| 282 | return fmt.Errorf("invalid %s seq %s", key, seqStr) |
| 283 | } |
| 284 | id := strings.TrimPrefix(key, prefix) |
| 285 | redisSeq := int64(seq) |
| 286 | mongoSeq, err := getSeq(ctx, id) |
| 287 | if err != nil { |
| 288 | return fmt.Errorf("get mongo seq %s failed %w", key, err) |
| 289 | } |
| 290 | if mongoSeq < redisSeq { |
| 291 | if err := setSeq(ctx, id, redisSeq); err != nil { |
| 292 | return fmt.Errorf("set mongo seq %s failed %w", key, err) |
| 293 | } |
| 294 | } |
| 295 | if delAfter > 0 { |
| 296 | if err := rdb.Expire(ctx, key, delAfter).Err(); err != nil { |
| 297 | return fmt.Errorf("redis expire key %s failed %w", key, err) |
| 298 | } |
| 299 | } else { |
| 300 | if err := rdb.Del(ctx, key).Err(); err != nil { |
| 301 | return fmt.Errorf("redis del key %s failed %w", key, err) |
| 302 | } |
| 303 | } |
| 304 | atomic.AddInt64(count, 1) |
| 305 | } |
| 306 | } |
| 307 | if cursor == 0 { |
| 308 | return nil |
| 309 | } |
| 310 | } |
| 311 | } |
| 312 | |
| 313 | func CheckVersion(coll *mongo.Collection, key string, currentVersion int) (converted bool, err error) { |
| 314 | type VersionTable struct { |