| 1198 | |
| 1199 | |
| 1200 | def main_worker_helper(options, args): |
| 1201 | N = int(options.max_jobs) |
| 1202 | if options.last_job_timeout is not None: |
| 1203 | end_time = time.time() + float(options.last_job_timeout) |
| 1204 | else: |
| 1205 | end_time = None |
| 1206 | |
| 1207 | def sighandler_shutdown(signum, frame): |
| 1208 | logger.info("Caught signal %i, shutting down." % signum) |
| 1209 | raise Shutdown(signum) |
| 1210 | |
| 1211 | def sighandler_wait_quit(signum, frame): |
| 1212 | logger.info("Caught signal %i, shutting down." % signum) |
| 1213 | raise WaitQuit(signum) |
| 1214 | |
| 1215 | is_windows = os.name == "nt" |
| 1216 | if not is_windows: |
| 1217 | signal.signal(signal.SIGHUP, sighandler_shutdown) |
| 1218 | signal.signal(signal.SIGUSR1, sighandler_wait_quit) |
| 1219 | signal.signal(signal.SIGINT, sighandler_shutdown) |
| 1220 | signal.signal(signal.SIGTERM, sighandler_shutdown) |
| 1221 | |
| 1222 | if N > 1: |
| 1223 | proc = None |
| 1224 | cons_errs = 0 |
| 1225 | |
| 1226 | while N and cons_errs < int(options.max_consecutive_failures): |
| 1227 | # exit due to time limit: |
| 1228 | if end_time and time.time() > end_time: |
| 1229 | logger.info("Exiting due to last_job_timeout") |
| 1230 | return |
| 1231 | |
| 1232 | # exit due to threshold on number of jobs: |
| 1233 | if ( |
| 1234 | options.max_jobs_in_db is not None |
| 1235 | and options.max_jobs_in_db != sys.maxsize |
| 1236 | ): |
| 1237 | num_jobs_db = number_of_jobs_in_db(options) |
| 1238 | if int(num_jobs_db) >= int(options.max_jobs_in_db): |
| 1239 | logger.info( |
| 1240 | "Exiting because there are " |
| 1241 | + str(num_jobs_db) |
| 1242 | + " jobs in the database, but the limit is " |
| 1243 | + str(options.max_jobs_in_db) |
| 1244 | ) |
| 1245 | return |
| 1246 | |
| 1247 | # try to run one MongoWorker |
| 1248 | try: |
| 1249 | if options.use_subprocesses: |
| 1250 | # recursive Popen, dropping N from the argv |
| 1251 | # By using another process to run this job |
| 1252 | # we protect ourselves from memory leaks, bad cleanup |
| 1253 | # and other annoying details. |
| 1254 | # The tradeoff is that a large dataset must be reloaded once for |
| 1255 | # each subprocess. |
| 1256 | sub_argv = [ |
| 1257 | sys.argv[0], |