| 996 | |
| 997 | |
| 998 | class MongoWorker: |
| 999 | poll_interval = 3.0 # -- seconds |
| 1000 | workdir = None |
| 1001 | |
| 1002 | def __init__( |
| 1003 | self, |
| 1004 | mj, |
| 1005 | poll_interval=poll_interval, |
| 1006 | workdir=workdir, |
| 1007 | exp_key=None, |
| 1008 | logfilename="logfile.txt", |
| 1009 | ): |
| 1010 | """ |
| 1011 | mj - MongoJobs interface to jobs collection |
| 1012 | poll_interval - seconds |
| 1013 | workdir - string |
| 1014 | exp_key - restrict reservations to this key |
| 1015 | """ |
| 1016 | self.mj = mj |
| 1017 | self.poll_interval = poll_interval |
| 1018 | self.workdir = workdir |
| 1019 | self.exp_key = exp_key |
| 1020 | self.logfilename = logfilename |
| 1021 | |
| 1022 | def make_log_handler(self): |
| 1023 | self.log_handler = logging.FileHandler(self.logfilename) |
| 1024 | self.log_handler.setFormatter( |
| 1025 | logging.Formatter(fmt="%(levelname)s (%(name)s): %(message)s") |
| 1026 | ) |
| 1027 | self.log_handler.setLevel(logging.INFO) |
| 1028 | |
| 1029 | def run_one(self, host_id=None, reserve_timeout=None, erase_created_workdir=False): |
| 1030 | if host_id == None: |
| 1031 | host_id = ("%s:%i" % (socket.gethostname(), os.getpid()),) |
| 1032 | job = None |
| 1033 | start_time = time.time() |
| 1034 | mj = self.mj |
| 1035 | while job is None: |
| 1036 | if reserve_timeout and (time.time() - start_time) > reserve_timeout: |
| 1037 | raise ReserveTimeout() |
| 1038 | job = mj.reserve(host_id, exp_key=self.exp_key) |
| 1039 | if not job: |
| 1040 | interval = 1 + numpy.random.rand() * (float(self.poll_interval) - 1.0) |
| 1041 | logger.info("no job found, sleeping for %.1fs" % interval) |
| 1042 | time.sleep(interval) |
| 1043 | |
| 1044 | logger.debug("job found: %s" % str(job)) |
| 1045 | |
| 1046 | # -- don't let the cmd mess up our trial object |
| 1047 | spec = spec_from_misc(job["misc"]) |
| 1048 | |
| 1049 | ctrl = MongoCtrl( |
| 1050 | trials=MongoTrials(mj, exp_key=job["exp_key"], refresh=False), |
| 1051 | read_only=False, |
| 1052 | current_trial=job, |
| 1053 | ) |
| 1054 | if self.workdir is None: |
| 1055 | workdir = job["misc"].get("workdir", os.getcwd()) |
no outgoing calls