(args, worker_id, logging_pipe, canteen, event)
| 411 | |
| 412 | |
| 413 | def worker_process(args, worker_id, logging_pipe, canteen, event): |
| 414 | running = True |
| 415 | |
| 416 | def termhandler(signum, frame): |
| 417 | nonlocal running |
| 418 | if running: |
| 419 | logger.info("Stopping worker process...") |
| 420 | running = False |
| 421 | else: |
| 422 | logger.warning("Killing worker process...") |
| 423 | return sys.exit(RET_KILLED) |
| 424 | |
| 425 | signal.signal(signal.SIGINT, signal.SIG_IGN) |
| 426 | signal.signal(signal.SIGTERM, termhandler) |
| 427 | if hasattr(signal, "SIGHUP"): |
| 428 | signal.signal(signal.SIGHUP, termhandler) |
| 429 | if hasattr(signal, "SIGBREAK"): |
| 430 | signal.signal(signal.SIGBREAK, termhandler) |
| 431 | |
| 432 | # Unblock the blocked signals inherited from the parent process |
| 433 | # before we start any worker threads and trigger middleware hooks. |
| 434 | try_unblock_signals() |
| 435 | try: |
| 436 | # Re-seed the random number generator from urandom on |
| 437 | # supported platforms. This should make it so that worker |
| 438 | # processes don't all follow the same sequence. |
| 439 | random.seed() |
| 440 | |
| 441 | logger = setup_worker_logging(args, worker_id, logging_pipe) |
| 442 | logger.debug("Loading broker...") |
| 443 | module, broker = import_broker(args.broker) |
| 444 | broker.emit_after("process_boot") |
| 445 | |
| 446 | logger.debug("Loading modules...") |
| 447 | for module in args.modules: |
| 448 | importlib.import_module(module) |
| 449 | |
| 450 | with canteen_try_init(canteen) as acquired: |
| 451 | if acquired: |
| 452 | logger.debug("Sending forks to main process...") |
| 453 | for middleware in broker.middleware: |
| 454 | for fork in middleware.forks: |
| 455 | fork_path = "%s:%s" % (fork.__module__, fork.__name__) |
| 456 | canteen_add(canteen, fork_path) |
| 457 | |
| 458 | logger.debug("Starting worker threads...") |
| 459 | worker = Worker(broker, queues=args.queues, worker_threads=args.threads) |
| 460 | worker.start() |
| 461 | except ImportError: |
| 462 | logger.exception("Failed to import module.") |
| 463 | return sys.exit(RET_IMPORT) |
| 464 | except BrokerConnectionError: |
| 465 | logger.exception("Broker connection failed.") |
| 466 | return sys.exit(RET_CONNECT) |
| 467 | finally: |
| 468 | # Signal to the master process that this process has booted, |
| 469 | # regardless of whether it failed or not. If it did fail, the |
| 470 | # worker process will realize that soon enough. |
nothing calls this directly
no test coverage detected