| 325 | } |
| 326 | |
| 327 | _scanForJobs(key, cursor, size, set, cb) { |
| 328 | const batchCount = Math.min(size, this.settings.redisScanCount); |
| 329 | this.client.sscan(key, cursor, 'COUNT', batchCount, (err, results) => { |
| 330 | /* istanbul ignore if */ |
| 331 | if (err) { |
| 332 | return cb(err); |
| 333 | } |
| 334 | |
| 335 | const nextCursor = results[0]; |
| 336 | const ids = results[1]; |
| 337 | |
| 338 | // A given element may be returned multiple times in SSCAN. |
| 339 | // So, we use a set to remove duplicates. |
| 340 | // https://redis.io/commands/scan#scan-guarantees |
| 341 | for (const id of ids) { |
| 342 | // For small sets, encoded as intsets, SSCAN will ignore COUNT. |
| 343 | // https://redis.io/commands/scan#the-count-option |
| 344 | if (set.size === size) break; |
| 345 | |
| 346 | set.add(id); |
| 347 | } |
| 348 | |
| 349 | if (nextCursor === '0' || set.size >= size) { |
| 350 | return cb(null, set); |
| 351 | } |
| 352 | |
| 353 | this._scanForJobs(key, nextCursor, size, set, cb); |
| 354 | }); |
| 355 | } |
| 356 | |
| 357 | _addJobsByIds(jobs, ids) { |
| 358 | // We need to re-ensure the queue is commandable, as we might be shutting |