MCPcopy Index your code
hub / github.com/nodejs/node / handleMessage

Function handleMessage

lib/internal/modules/esm/worker.js:226–282  ·  view source on GitHub ↗

* Handles incoming messages from the main thread or other workers. * @param {object} options - The options object. * @param {string} options.method - The name of the hook. * @param {Array} options.args - The arguments to pass to the method. * @param {MessagePort} options.port - The messa

({ method, args, port })

Source from the content-addressed store, hash-verified

224 * @param {MessagePort} options.port - The message port to use for communication.
225 */
226 async function handleMessage({ method, args, port }) {
227 // Each potential exception needs to be caught individually so that the correct error is sent to
228 // the main thread.
229 let hasError = false;
230 let shouldRemoveGlobalErrorHandler = false;
231 assert(typeof asyncLoaderHooks[method] === 'function', `${method} is not implemented in the loader worker`);
232 if (port == null && !hasUncaughtExceptionCaptureCallback()) {
233 // When receiving sync messages, we want to unlock the main thread when there's an exception.
234 process.on('uncaughtException', errorHandler);
235 shouldRemoveGlobalErrorHandler = true;
236 }
237
238 // We are about to yield the execution with `await ReflectApply` below. In case the code
239 // following the `await` never runs, we remove the message handler so the `beforeExit` event
240 // can be triggered.
241 syncCommPort.off('message', handleMessage);
242
243 // We keep checking for new messages to not miss any.
244 clearImmediate(immediate);
245 immediate = setImmediate(checkForMessages).unref();
246
247 unsettledResponsePorts.add(port ?? syncCommPort);
248
249 let response;
250 try {
251 response = await ReflectApply(asyncLoaderHooks[method], asyncLoaderHooks, args);
252 } catch (exception) {
253 hasError = true;
254 response = exception;
255 }
256
257 unsettledResponsePorts.delete(port ?? syncCommPort);
258
259 // Send the method response (or exception) to the main thread.
260 try {
261 (port ?? syncCommPort).postMessage(
262 wrapMessage(hasError ? 'error' : 'success', response),
263 transferArrayBuffer(hasError, response?.source),
264 );
265 } catch (exception) {
266 // Or send the exception thrown when trying to send the response.
267 (port ?? syncCommPort).postMessage(wrapMessage('error', exception));
268 }
269
270 if (shouldRemoveGlobalErrorHandler) {
271 process.off('uncaughtException', errorHandler);
272 }
273
274 syncCommPort.off('message', handleMessage);
275 // We keep checking for new messages to not miss any.
276 clearImmediate(immediate);
277 immediate = setImmediate(checkForMessages).unref();
278 // To prevent the main thread from terminating before this function completes after unlocking,
279 // the following process is executed at the end of the function.
280 AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1);
281 AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
282 }
283}

Callers 1

checkForMessagesFunction · 0.70

Calls 12

clearImmediateFunction · 0.85
wrapMessageFunction · 0.85
transferArrayBufferFunction · 0.70
addMethod · 0.65
deleteMethod · 0.65
assertFunction · 0.50
setImmediateFunction · 0.50
onMethod · 0.45
offMethod · 0.45
unrefMethod · 0.45
postMessageMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…