A Job defines three TaskGroups: the `init_group`, the `epoch_group` and the `exit_group` which will be run by a JobRunner. The `init_group` will be run only once at startup. Its role is to initialize globally persistent blobs such as model weights, accumulators and data file li
| 23 | |
| 24 | |
| 25 | class Job(context.Managed): |
| 26 | """ |
| 27 | A Job defines three TaskGroups: the `init_group`, the `epoch_group` and the |
| 28 | `exit_group` which will be run by a JobRunner. |
| 29 | |
| 30 | The `init_group` will be run only once at startup. Its role is to |
| 31 | initialize globally persistent blobs such as model weights, accumulators |
| 32 | and data file lists. |
| 33 | |
| 34 | The `epoch_group` will be run in a loop after init_group. The loop will |
| 35 | exit when any of the stop signals added with `add_stop_condition` is True |
| 36 | at the end of an epoch. |
| 37 | |
| 38 | The download_group will be run only once, after all the executions of |
| 39 | epoch_group finish. Its role is to collect the distribute scattered |
| 40 | parameters back after training. |
| 41 | |
| 42 | The `exit_group` will be run only once at the very end of the job, the |
| 43 | role of this group is to save the results of training in the end of the job. |
| 44 | |
| 45 | Jobs are context-driven, so that Tasks can be added to the active Job |
| 46 | without having to explicitly pass the job object around. |
| 47 | |
| 48 | Example of usage: |
| 49 | |
| 50 | def build_reader(partitions): |
| 51 | with Job.current().init_group: |
| 52 | reader = HiveReader(init_reader, ..., partitions) |
| 53 | Task(step=init_reader) |
| 54 | with Job.current().epoch_group: |
| 55 | limited_reader = ReaderWithLimit(reader, num_iter=10000) |
| 56 | data_queue = pipe(limited_reader, num_threads=8) |
| 57 | Job.current().add_stop_condition(limited_reader.data_finished()) |
| 58 | return data_queue |
| 59 | |
| 60 | def build_hogwild_trainer(reader, model): |
| 61 | with Job.current().init_group: |
| 62 | Task(step=model.param_init_net) |
| 63 | with Job.current().epoch_group: |
| 64 | pipe(reader, processor=model, num_threads=8) |
| 65 | with Job.current().exit_group: |
| 66 | Task(step=model.save_model_net) |
| 67 | |
| 68 | with Job() as job: |
| 69 | reader = build_reader(partitions) |
| 70 | model = build_model(params) |
| 71 | build_hogwild_trainer(reader, model) |
| 72 | """ |
| 73 | def __init__(self, |
| 74 | init_group=None, epoch_group=None, |
| 75 | download_group=None, exit_group=None, |
| 76 | stop_conditions=None, nodes_to_checkpoint=None): |
| 77 | self.init_group = init_group or TaskGroup( |
| 78 | workspace_type=WorkspaceType.GLOBAL) |
| 79 | self.epoch_group = epoch_group or TaskGroup() |
| 80 | self.download_group = download_group or TaskGroup() |
| 81 | self.exit_group = exit_group or TaskGroup() |
| 82 | self.stop_conditions = stop_conditions or [] |
no outgoing calls