Finds and runs a pending task that in the first of the sorting list. Parameters ----------- task_name : str The task name. sort : List of tuple PyMongo sort comment, search "PyMongo find one sorting" and `collection level operations <http://ap
(self, task_name=None, sort=None, **kwargs)
| 588 | logging.info("[Database] Saved Task - task_name: {} script: {}".format(task_name, script)) |
| 589 | |
| 590 | def run_top_task(self, task_name=None, sort=None, **kwargs): |
| 591 | """Finds and runs a pending task that in the first of the sorting list. |
| 592 | |
| 593 | Parameters |
| 594 | ----------- |
| 595 | task_name : str |
| 596 | The task name. |
| 597 | sort : List of tuple |
| 598 | PyMongo sort comment, search "PyMongo find one sorting" and `collection level operations <http://api.mongodb.com/python/current/api/pymongo/collection.html>`__ for more details. |
| 599 | kwargs : other parameters |
| 600 | Users customized parameters such as description, version number. |
| 601 | |
| 602 | Examples |
| 603 | --------- |
| 604 | Monitors the database and pull tasks to run |
| 605 | >>> while True: |
| 606 | >>> print("waiting task from distributor") |
| 607 | >>> db.run_top_task(task_name='mnist', sort=[("time", -1)]) |
| 608 | >>> time.sleep(1) |
| 609 | |
| 610 | Returns |
| 611 | -------- |
| 612 | boolean : True for success, False for fail. |
| 613 | """ |
| 614 | if not isinstance(task_name, str): # is None: |
| 615 | raise Exception("task_name should be string") |
| 616 | self._fill_project_info(kwargs) |
| 617 | kwargs.update({'status': 'pending'}) |
| 618 | |
| 619 | # find task and set status to running |
| 620 | task = self.db.Task.find_one_and_update(kwargs, {'$set': {'status': 'running'}}, sort=sort) |
| 621 | |
| 622 | # try: |
| 623 | # get task info e.g. hyper parameters, python script |
| 624 | if task is None: |
| 625 | logging.info("[Database] Find Task FAIL: key: {} sort: {}".format(task_name, sort)) |
| 626 | return False |
| 627 | else: |
| 628 | logging.info("[Database] Find Task SUCCESS: key: {} sort: {}".format(task_name, sort)) |
| 629 | _datetime = task['time'] |
| 630 | _script = task['script'] |
| 631 | _id = task['_id'] |
| 632 | _hyper_parameters = task['hyper_parameters'] |
| 633 | _saved_result_keys = task['saved_result_keys'] |
| 634 | logging.info(" hyper parameters:") |
| 635 | for key in _hyper_parameters: |
| 636 | globals()[key] = _hyper_parameters[key] |
| 637 | logging.info(" {}: {}".format(key, _hyper_parameters[key])) |
| 638 | # run task |
| 639 | s = time.time() |
| 640 | logging.info("[Database] Start Task: key: {} sort: {} push time: {}".format(task_name, sort, _datetime)) |
| 641 | _script = _script.decode('utf-8') |
| 642 | with tf.Graph().as_default(): # # as graph: # clear all TF graphs |
| 643 | exec(_script, globals()) |
| 644 | |
| 645 | # set status to finished |
| 646 | _ = self.db.Task.find_one_and_update({'_id': _id}, {'$set': {'status': 'finished'}}) |
| 647 |
no test coverage detected