MCPcopy Index your code
hub / github.com/bee-queue/bee-queue / connect

Method connect

lib/queue.js:92–145  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

90 }
91
92 connect() {
93 return new Promise((resolve, reject) => {
94 try {
95 if (this._isReady) return resolve(this._isReady);
96
97 const getEventPromise = () => {
98 if (this.settings.getEvents || this.settings.activateDelayedJobs) {
99 return this.makeClient('eclient', true).then(() => {
100 this.eclient.on('message', this._onMessage.bind(this));
101 const channels = [];
102 if (this.settings.getEvents) {
103 channels.push(this.toKey('events'));
104 }
105 if (this.settings.activateDelayedJobs) {
106 channels.push(this.toKey('earlierDelayed'));
107 }
108 return Promise.all(
109 channels.map((channel) =>
110 helpers.callAsync((done) =>
111 this.eclient.subscribe(channel, done)
112 )
113 )
114 );
115 });
116 }
117
118 return null;
119 };
120
121 const eventsPromise = getEventPromise();
122 // Wait for Lua scripts and client connections to load. Also wait for
123 // bclient and eclient/subscribe if they're needed.
124 this._ready = Promise.all([
125 // Make the clients
126 this.makeClient('client', false),
127 this.settings.isWorker ? this.makeClient('bclient', true) : null,
128 eventsPromise,
129 ])
130 .then(() => {
131 if (this.settings.ensureScripts) {
132 return lua.buildCache(this.client);
133 }
134 })
135 .then(() => {
136 this._isReady = true;
137 setImmediate(() => this.emit('ready'));
138 resolve(this._isReady);
139 return this;
140 });
141 } catch (err) {
142 reject(err);
143 }
144 });
145 }
146
147 _onMessage(channel, message) {
148 if (channel === this.toKey('earlierDelayed')) {

Callers 2

constructorMethod · 0.95
queue-test.jsFile · 0.80

Calls 2

makeClientMethod · 0.95
rejectFunction · 0.85

Tested by

no test coverage detected