MCPcopy
hub / github.com/amqp-node/amqplib

github.com/amqp-node/amqplib @v2.0.1 sqlite

repository ↗ · DeepWiki ↗ · release v2.0.1 ↗
659 symbols 1,743 edges 72 files 12 documented · 2%
README

AMQP 0-9-1 library and client for Node.JS

NPM version NPM downloads Node.js CI amqplib

A library for making AMQP 0-9-1 clients for Node.JS, and an AMQP 0-9-1 client for Node.JS v10+. This library does not implement AMQP1.0 or AMQP0-10.

npm install amqplib

RabbitMQ Compatibility

Only 0.10.7 and later versions of this library are compatible with RabbitMQ 4.1.0 (and later releases).

Links

Project status

  • Expected to work
  • Complete high-level and low-level APIs (i.e., all bits of the protocol)
  • Stable APIs
  • A fair few tests
  • Measured test coverage
  • Ports of the RabbitMQ tutorials as examples
  • Used in production

Still working on:

  • Getting to 100% (or very close to 100%) test coverage

Callback API example

const amqplib = require('amqplib/callback_api');
const queue = 'tasks';

amqplib.connect('amqp://localhost', (err, conn) => {
  if (err) throw err;

  conn.on('error', (err) => { console.error('Connection error:', err); });
  conn.on('handler-error', (err, event) => { console.error(`Uncaught exception in connection ${event} listener:`, err); });

  // Listener
  conn.createChannel((err, ch2) => {
    if (err) throw err;

    ch2.on('error', (err) => { console.error('Channel error:', err); });
    ch2.on('handler-error', (err, event) => { console.error(`Uncaught exception in channel ${event} listener:`, err); });

    ch2.assertQueue(queue);

    ch2.consume(queue, (msg) => {
      if (msg !== null) {
        console.log(msg.content.toString());
        ch2.ack(msg);
      } else {
        console.log('Consumer cancelled by server');
      }
    });
  });

  // Sender
  conn.createChannel((err, ch1) => {
    if (err) throw err;

    ch1.on('error', (err) => { console.error('Channel error:', err); });
    ch1.on('handler-error', (err, event) => { console.error(`Uncaught exception in channel ${event} listener:`, err); });
    ch1.assertQueue(queue);

    setInterval(() => {
      ch1.sendToQueue(queue, Buffer.from('something to do'));
    }, 1000);
  });
});

Promise/Async API example

const amqplib = require('amqplib');

(async () => {
  const queue = 'tasks';
  const conn = await amqplib.connect('amqp://localhost');
  conn.on('error', (err) => { console.error('Connection error:', err); });
  conn.on('handler-error', (err, event) => { console.error(`Uncaught exception in connection ${event} listener:`, err); });

  const ch1 = await conn.createChannel();
  ch1.on('error', (err) => { console.error('Channel error:', err); });
  ch1.on('handler-error', (err, event) => { console.error(`Uncaught exception in channel ${event} listener:`, err); });
  await ch1.assertQueue(queue);

  // Listener
  ch1.consume(queue, (msg) => {
    if (msg !== null) {
      console.log('Received:', msg.content.toString());
      ch1.ack(msg);
    } else {
      console.log('Consumer cancelled by server');
    }
  });

  // Sender
  const ch2 = await conn.createChannel();
  ch2.on('error', (err) => { console.error('Channel error:', err); });
  ch2.on('handler-error', (err, event) => { console.error(`Uncaught exception in channel ${event} listener:`, err); });

  setInterval(() => {
    ch2.sendToQueue(queue, Buffer.from('something to do'));
  }, 1000);
})();

Opt-in recovery

Automatic recovery is available as an opt-in feature through connect options:

const amqplib = require('amqplib');

const connection = await amqplib.connect('amqp://localhost', {
  recovery: {
    initialDelay: 200, // ms
    maxDelay: 5000, // ms
    factor: 2,
    jitter: 0.2,
    maxRetries: Infinity,
    async setup(model) {
      // Called after every successful (re)connect.
      // Recreate topology/consumers here.
      const ch = await model.createChannel();
      await ch.assertQueue('tasks', {durable: true});
    },
  },
});

connection.on('connect', () => {
  console.log('connected');
});

connection.on('disconnect', (err) => {
  console.warn('disconnected', err.message);
});

Callback API supports the same option:

const amqplib = require('amqplib/callback_api');

amqplib.connect(
  'amqp://localhost',
  {
    recovery: {
      initialDelay: 200,
      maxDelay: 5000,
      setup(model, done) {
        model.createChannel((err, ch) => {
          if (err) return done(err);
          ch.assertQueue('tasks', {durable: true}, done);
        });
      },
    },
  },
  (err, conn) => {
    if (err) throw err;
    conn.on('connect', () => console.log('connected'));
  },
);

Without recovery options, behavior is unchanged.

Error handling in event handlers

If a user-supplied event handler throws a synchronous error, the throw will propagate into amqplib internals. Depending on where in the call stack it escapes, this can silently swallow the error, or close the channel or connection.

To avoid this, register a handler-error listener on the connection and on each channel. If a listener is present, amqplib will catch any throw from a user event handler and deliver it there instead of letting it propagate internally. The listener receives the thrown error and the name of the event whose handler threw.

Note that handler-error is not a replacement for the error event. The error event is emitted by amqplib itself when the connection or channel encounters a protocol-level error. The handler-error event is only emitted when your own event listener throws.

const connection = await amqp.connect('amqp://localhost');

connection.on('error', (err) => { /* handle protocol errors */ });
connection.on('handler-error', (err, event) => {
  console.error(`Uncaught exception in connection ${event} listener:`, err);
});

const channel = await connection.createChannel();

channel.on('error', (err) => { /* handle protocol errors */ });
channel.on('handler-error', (err, event) => {
  console.error(`Uncaught exception in channel ${event} listener:`, err);
});

If no handler-error listener is registered, behaviour is unchanged from previous versions.

Running tests

npm test

To run the tests RabbitMQ is required. Either install it with your package manager, or use docker to run a RabbitMQ instance.

docker run -d --name amqp.test -p 5672:5672 rabbitmq

If prefer not to run RabbitMQ locally it is also possible to use a instance of RabbitMQ hosted elsewhere. Use the URL environment variable to configure a different amqp host to connect to. You may also need to do this if docker is not on localhost; e.g., if it's running in docker-machine.

One public host is dev.rabbitmq.com:

URL=amqp://dev.rabbitmq.com npm test

NB You may experience test failures due to timeouts if using the dev.rabbitmq.com instance.

You can run it under different versions of Node.JS using nave:

nave use 10 npm test

or run the tests on all supported versions of Node.JS in one go:

make test-all-nodejs

(which also needs nave installed, of course).

Lastly, setting the environment variable LOG_ERRORS will cause the tests to output error messages encountered, to the console; this is really only useful for checking the kind and formatting of the errors.

LOG_ERRORS=true npm test

Test coverage

make coverage
open file://`pwd`/coverage/lcov-report/index.html

Extension points exported contracts — how you extend this code

ConfirmChannel (Interface)
(no doc) [1 implementers]
callback_api.d.ts
ConfirmChannel (Interface)
(no doc) [1 implementers]
index.d.ts
Empty (Interface)
(no doc)
lib/properties.d.ts
Connection (Interface)
(no doc)
callback_api.d.ts
Connection (Interface)
(no doc)
index.d.ts
AssertQueue (Interface)
(no doc)
lib/properties.d.ts
Channel (Interface)
(no doc)
callback_api.d.ts
ChannelModel (Interface)
(no doc)
index.d.ts

Core symbols most depended-on inside this repo

on
called by 189
index.d.ts
println
called by 168
bin/generate-defs.js
close
called by 120
index.d.ts
cb
called by 102
lib/recovery.js
send
called by 95
test/lib/util.js
wait
called by 73
test/lib/util.js
assertQueue
called by 69
index.d.ts
done
called by 68
lib/recovery.js

Shape

Function 316
Method 271
Class 38
Interface 34

Languages

TypeScript100%

Modules by API surface

lib/defs.js134 symbols
lib/recovery.js59 symbols
index.d.ts46 symbols
callback_api.d.ts45 symbols
lib/connection.js44 symbols
lib/callback_model.js42 symbols
lib/channel_model.js41 symbols
lib/channel.js36 symbols
bin/generate-defs.js28 symbols
lib/properties.d.ts23 symbols
test/recovery.test.js16 symbols
test/lib/util.js11 symbols

Dependencies from manifests, versioned

@biomejs/biome2.2.2 · 1×
@types/node25.6.0 · 1×
amqplib0.10.3 · 1×
claire0.4.1 · 1×
lefthook1.12.3 · 1×
typescript5.9.3 · 1×
uglify-js2.8.x · 1×
uuid* · 1×

For agents

$ claude mcp add amqplib \
  -- python -m otcore.mcp_server <graph>

⬇ download graph artifact