MCPcopy
hub / github.com/Bogdanp/dramatiq / worker_process

Function worker_process

dramatiq/cli.py:413–480  ·  view source on GitHub ↗
(args, worker_id, logging_pipe, canteen, event)

Source from the content-addressed store, hash-verified

411
412
413def worker_process(args, worker_id, logging_pipe, canteen, event):
414 running = True
415
416 def termhandler(signum, frame):
417 nonlocal running
418 if running:
419 logger.info("Stopping worker process...")
420 running = False
421 else:
422 logger.warning("Killing worker process...")
423 return sys.exit(RET_KILLED)
424
425 signal.signal(signal.SIGINT, signal.SIG_IGN)
426 signal.signal(signal.SIGTERM, termhandler)
427 if hasattr(signal, "SIGHUP"):
428 signal.signal(signal.SIGHUP, termhandler)
429 if hasattr(signal, "SIGBREAK"):
430 signal.signal(signal.SIGBREAK, termhandler)
431
432 # Unblock the blocked signals inherited from the parent process
433 # before we start any worker threads and trigger middleware hooks.
434 try_unblock_signals()
435 try:
436 # Re-seed the random number generator from urandom on
437 # supported platforms. This should make it so that worker
438 # processes don't all follow the same sequence.
439 random.seed()
440
441 logger = setup_worker_logging(args, worker_id, logging_pipe)
442 logger.debug("Loading broker...")
443 module, broker = import_broker(args.broker)
444 broker.emit_after("process_boot")
445
446 logger.debug("Loading modules...")
447 for module in args.modules:
448 importlib.import_module(module)
449
450 with canteen_try_init(canteen) as acquired:
451 if acquired:
452 logger.debug("Sending forks to main process...")
453 for middleware in broker.middleware:
454 for fork in middleware.forks:
455 fork_path = "%s:%s" % (fork.__module__, fork.__name__)
456 canteen_add(canteen, fork_path)
457
458 logger.debug("Starting worker threads...")
459 worker = Worker(broker, queues=args.queues, worker_threads=args.threads)
460 worker.start()
461 except ImportError:
462 logger.exception("Failed to import module.")
463 return sys.exit(RET_IMPORT)
464 except BrokerConnectionError:
465 logger.exception("Broker connection failed.")
466 return sys.exit(RET_CONNECT)
467 finally:
468 # Signal to the master process that this process has booted,
469 # regardless of whether it failed or not. If it did fail, the
470 # worker process will realize that soon enough.

Callers

nothing calls this directly

Calls 9

startMethod · 0.95
stopMethod · 0.95
canteen_try_initFunction · 0.90
canteen_addFunction · 0.90
WorkerClass · 0.90
try_unblock_signalsFunction · 0.85
import_brokerFunction · 0.85
emit_afterMethod · 0.80
closeMethod · 0.45

Tested by

no test coverage detected