MCPcopy Index your code
hub / github.com/Automattic/kue

github.com/Automattic/kue @v0.11.6 sqlite

repository ↗ · DeepWiki ↗ · release v0.11.6 ↗
96 symbols 201 edges 31 files 25 documented · 26%
README

Kue

Build Status npm version Dependency Status Join the chat at https://gitter.im/Automattic/kue

Kue is a priority job queue backed by redis, built for node.js.

PROTIP This is the latest Kue documentation, make sure to also read the changelist.

Upgrade Notes (Please Read)

Installation

  • Latest release:

    $ npm install kue
    
  • Master branch:

    $ npm install http://github.com/Automattic/kue/tarball/master
    

NPM

Features

  • Delayed jobs
  • Distribution of parallel work load
  • Job event and progress pubsub
  • Job TTL
  • Optional retries with backoff
  • Graceful workers shutdown
  • Full-text search capabilities
  • RESTful JSON API
  • Rich integrated UI
  • Infinite scrolling
  • UI progress indication
  • Job specific logging
  • Powered by Redis

Overview

Creating Jobs

First create a job Queue with kue.createQueue():

var kue = require('kue')
  , queue = kue.createQueue();

Calling queue.create() with the type of job ("email"), and arbitrary job data will return a Job, which can then be save()ed, adding it to redis, with a default priority level of "normal". The save() method optionally accepts a callback, responding with an error if something goes wrong. The title key is special-cased, and will display in the job listings within the UI, making it easier to find a specific job.

var job = queue.create('email', {
    title: 'welcome email for tj'
  , to: 'tj@learnboost.com'
  , template: 'welcome-email'
}).save( function(err){
   if( !err ) console.log( job.id );
});

Job Priority

To specify the priority of a job, simply invoke the priority() method with a number, or priority name, which is mapped to a number.

queue.create('email', {
    title: 'welcome email for tj'
  , to: 'tj@learnboost.com'
  , template: 'welcome-email'
}).priority('high').save();

The default priority map is as follows:

{
    low: 10
  , normal: 0
  , medium: -5
  , high: -10
  , critical: -15
};

Failure Attempts

By default jobs only have one attempt, that is when they fail, they are marked as a failure, and remain that way until you intervene. However, Kue allows you to specify this, which is important for jobs such as transferring an email, which upon failure, may usually retry without issue. To do this invoke the .attempts() method with a number.

 queue.create('email', {
     title: 'welcome email for tj'
   , to: 'tj@learnboost.com'
   , template: 'welcome-email'
 }).priority('high').attempts(5).save();

Failure Backoff

Job retry attempts are done as soon as they fail, with no delay, even if your job had a delay set via Job#delay. If you want to delay job re-attempts upon failures (known as backoff) you can use Job#backoff method in different ways:

    // Honor job's original delay (if set) at each attempt, defaults to fixed backoff
    job.attempts(3).backoff( true )

    // Override delay value, fixed backoff
    job.attempts(3).backoff( {delay: 60*1000, type:'fixed'} )

    // Enable exponential backoff using original delay (if set)
    job.attempts(3).backoff( {type:'exponential'} )

    // Use a function to get a customized next attempt delay value
    job.attempts(3).backoff( function( attempts, delay ){
      //attempts will correspond to the nth attempt failure so it will start with 0
      //delay will be the amount of the last delay, not the initial delay unless attempts === 0
      return my_customized_calculated_delay;
    })

In the last scenario, provided function will be executed (via eval) on each re-attempt to get next attempt delay value, meaning that you can't reference external/context variables within it.

Job TTL

Job producers can set an expiry value for the time their job can live in active state, so that if workers didn't reply in timely fashion, Kue will fail it with TTL exceeded error message preventing that job from being stuck in active state and spoiling concurrency.

queue.create('email', {title: 'email job with TTL'}).ttl(milliseconds).save();

Job Logs

Job-specific logs enable you to expose information to the UI at any point in the job's life-time. To do so simply invoke job.log(), which accepts a message string as well as variable-arguments for sprintf-like support:

job.log('$%d sent to %s', amount, user.name);

or anything else (uses util.inspect() internally):

job.log({key: 'some key', value: 10});
job.log([1,2,3,5,8]);
job.log(10.1);

Job Progress

Job progress is extremely useful for long-running jobs such as video conversion. To update the job's progress simply invoke job.progress(completed, total [, data]):

job.progress(frames, totalFrames);

data can be used to pass extra information about the job. For example a message or an object with some extra contextual data to the current status.

Job Events

Job-specific events are fired on the Job instances via Redis pubsub. The following events are currently supported:

  • enqueue the job is now queued
  • start the job is now running
  • promotion the job is promoted from delayed state to queued
  • progress the job's progress ranging from 0-100
  • failed attempt the job has failed, but has remaining attempts yet
  • failed the job has failed and has no remaining attempts
  • complete the job has completed
  • remove the job has been removed

For example this may look something like the following:

var job = queue.create('video conversion', {
    title: 'converting loki\'s to avi'
  , user: 1
  , frames: 200
});

job.on('complete', function(result){
  console.log('Job completed with data ', result);

}).on('failed attempt', function(errorMessage, doneAttempts){
  console.log('Job failed');

}).on('failed', function(errorMessage){
  console.log('Job failed');

}).on('progress', function(progress, data){
  console.log('\r  job #' + job.id + ' ' + progress + '% complete with data ', data );

});

Note that Job level events are not guaranteed to be received upon process restarts, since restarted node.js process will lose the reference to the specific Job object. If you want a more reliable event handler look for Queue Events.

Note Kue stores job objects in memory until they are complete/failed to be able to emit events on them. If you have a huge concurrency in uncompleted jobs, turn this feature off and use queue level events for better memory scaling.

js kue.createQueue({jobEvents: false})

Alternatively, you can use the job level function events to control whether events are fired for a job at the job level.

js var job = queue.create('test').events(false).save();

Queue Events

Queue-level events provide access to the job-level events previously mentioned, however scoped to the Queue instance to apply logic at a "global" level. An example of this is removing completed jobs:

queue.on('job enqueue', function(id, type){
  console.log( 'Job %s got queued of type %s', id, type );

}).on('job complete', function(id, result){
  kue.Job.get(id, function(err, job){
    if (err) return;
    job.remove(function(err){
      if (err) throw err;
      console.log('removed completed job #%d', job.id);
    });
  });
});

The events available are the same as mentioned in "Job Events", however prefixed with "job ".

Delayed Jobs

Delayed jobs may be scheduled to be queued for an arbitrary distance in time by invoking the .delay(ms) method, passing the number of milliseconds relative to now. Alternatively, you can pass a JavaScript Date object with a specific time in the future. This automatically flags the Job as "delayed".

var email = queue.create('email', {
    title: 'Account renewal required'
  , to: 'tj@learnboost.com'
  , template: 'renewal-email'
}).delay(milliseconds)
  .priority('high')
  .save();

Kue will check the delayed jobs with a timer, promoting them if the scheduled delay has been exceeded, defaulting to a check of top 1000 jobs every second.

Processing Jobs

Processing jobs is simple with Kue. First create a Queue instance much like we do for creating jobs, providing us access to redis etc, then invoke queue.process() with the associated type. Note that unlike what the name createQueue suggests, it currently returns a singleton Queue instance. So you can configure and use only a single Queue object within your node.js process.

In the following example we pass the callback done to email, When an error occurs we invoke done(err) to tell Kue something happened, otherwise we invoke done() only when the job is complete. If this function responds with an error it will be displayed in the UI and the job will be marked as a failure. The error object passed to done, should be of standard type Error.

var kue = require('kue')
 , queue = kue.createQueue();

queue.process('email', function(job, done){
  email(job.data.to, done);
});

function email(address, done) {
  if(!isValidEmail(address)) {
    //done('invalid to address') is possible but discouraged
    return done(new Error('invalid to address'));
  }
  // email send stuff...
  done();
}

Workers can also pass job result as the second parameter to done done(null,result) to store that in Job.result key. result is also passed through complete event handlers so that job producers can receive it if they like to.

Processing Concurrency

By default a call to queue.process() will only accept one job at a time for processing. For small tasks like sending emails this is not ideal, so we may specify the maximum active jobs for this type by passing a number:

queue.process('email', 20, function(job, done){
  // ...
});

Pause Processing

Workers can temporary pause and resume their activity. It is, after calling pause they will receive no jobs in their process callback until resume is called. pause function gracefully shutdowns this worker, and uses the same internal functionality as shutdown method in Graceful Shutdown.

queue.process('email', function(job, ctx, done){
  ctx.pause( 5000, function(err){
    console.log("Worker is paused... ");
    setTimeout( function(){ ctx.resume(); }, 10000 );
  });
});

Note that the ctx parameter from Kue >=0.9.0 is the second argument of the process callback function and done is idiomatically always the last

Note that pause method signature is changed from Kue >=0.9.0 to move the callback function to the last.

Updating Progress

For a "real" example, let's say we need to compile a PDF from numerous slides with node-canvas. Our job may consist of the following data, note that in general you should not store large data in the job it-self, it's better to store references like ids, pulling them in while processing.

queue.create('slideshow pdf', {
    title: user.name + "'s slideshow"
  , slides: [...] // keys to data stored in redis, mongodb, or some other store
});

We can access this same arbitrary data within a separate process while processing, via the job.data property. In the example we render each slide one-by-one, updating the job's log and progress.

queue.process('slideshow pdf', 5, function(job, done){
  var slides = job.data.slides
    , len = slides.length;

  function next(i) {
    var slide = slides[i]; // pretend we did a query on this slide id ;)
    job.log('rendering %dx%d slide', slide.width, slide.height);
    renderSlide(slide, function(err){
      if (err) return done(err);
      job.progress(i, len, {nextSlide : i == len ? 'itsdone' : i + 1});
      if (i == len) done()
      else next(i + 1);
    });
  }

  next(0);
});

Graceful Shutdown

Queue#shutdown([timeout,] fn) signals all workers to stop processing after their current active job is done. Workers will wait timeout milliseconds for their active job's done to be called or mark the active job failed with shutdown error reason. When all workers tell Kue they are stopped fn is called.

```javascript var queue = require('kue').createQueue();

process.once

Core symbols most depended-on inside this repo

o
called by 41
lib/http/public/javascripts/jquery.ext.js
request
called by 20
lib/http/public/javascripts/main.js
show
called by 7
lib/http/public/javascripts/main.js
noop
called by 6
lib/queue/job.js
next
called by 6
examples/video.js
callback
called by 5
lib/http/public/javascripts/caustic.js
getSearch
called by 4
lib/queue/job.js
relative
called by 3
lib/http/public/javascripts/utils.js

Shape

Function 96

Languages

TypeScript100%

Modules by API surface

lib/http/public/javascripts/jquery.min.js44 symbols
lib/http/public/javascripts/main.js10 symbols
lib/queue/job.js5 symbols
lib/queue/worker.js4 symbols
lib/http/public/javascripts/utils.js4 symbols
lib/kue.js3 symbols
lib/http/routes/json.js3 symbols
lib/http/public/javascripts/caustic.js3 symbols
examples/video.js3 symbols
examples/stale.js3 symbols
examples/events.js3 symbols
lib/queue/test_mode.js2 symbols

Dependencies from manifests, versioned

async1.4.2 · 1×
body-parser1.12.2 · 1×
chai3.3.0 · 1×
coffee-script1.10.0 · 1×
express4.12.2 · 1×
lodash4.0.0 · 1×
mocha2.3.3 · 1×
nib1.1.2 · 1×
node-redis-warlock0.2.0 · 1×
pug2.0.0-beta3 · 1×
redis2.6.0-2 · 1×
should3.1.0 · 1×

For agents

$ claude mcp add kue \
  -- python -m otcore.mcp_server <graph>

⬇ download graph artifact