MCPcopy Index your code
hub / github.com/pytorch/pytorch / Job

Class Job

caffe2/python/checkpoint.py:25–112  ·  view source on GitHub ↗

A Job defines three TaskGroups: the `init_group`, the `epoch_group` and the `exit_group` which will be run by a JobRunner. The `init_group` will be run only once at startup. Its role is to initialize globally persistent blobs such as model weights, accumulators and data file li

Source from the content-addressed store, hash-verified

23
24
25class Job(context.Managed):
26 """
27 A Job defines three TaskGroups: the `init_group`, the `epoch_group` and the
28 `exit_group` which will be run by a JobRunner.
29
30 The `init_group` will be run only once at startup. Its role is to
31 initialize globally persistent blobs such as model weights, accumulators
32 and data file lists.
33
34 The `epoch_group` will be run in a loop after init_group. The loop will
35 exit when any of the stop signals added with `add_stop_condition` is True
36 at the end of an epoch.
37
38 The download_group will be run only once, after all the executions of
39 epoch_group finish. Its role is to collect the distribute scattered
40 parameters back after training.
41
42 The `exit_group` will be run only once at the very end of the job, the
43 role of this group is to save the results of training in the end of the job.
44
45 Jobs are context-driven, so that Tasks can be added to the active Job
46 without having to explicitly pass the job object around.
47
48 Example of usage:
49
50 def build_reader(partitions):
51 with Job.current().init_group:
52 reader = HiveReader(init_reader, ..., partitions)
53 Task(step=init_reader)
54 with Job.current().epoch_group:
55 limited_reader = ReaderWithLimit(reader, num_iter=10000)
56 data_queue = pipe(limited_reader, num_threads=8)
57 Job.current().add_stop_condition(limited_reader.data_finished())
58 return data_queue
59
60 def build_hogwild_trainer(reader, model):
61 with Job.current().init_group:
62 Task(step=model.param_init_net)
63 with Job.current().epoch_group:
64 pipe(reader, processor=model, num_threads=8)
65 with Job.current().exit_group:
66 Task(step=model.save_model_net)
67
68 with Job() as job:
69 reader = build_reader(partitions)
70 model = build_model(params)
71 build_hogwild_trainer(reader, model)
72 """
73 def __init__(self,
74 init_group=None, epoch_group=None,
75 download_group=None, exit_group=None,
76 stop_conditions=None, nodes_to_checkpoint=None):
77 self.init_group = init_group or TaskGroup(
78 workspace_type=WorkspaceType.GLOBAL)
79 self.epoch_group = epoch_group or TaskGroup()
80 self.download_group = download_group or TaskGroup()
81 self.exit_group = exit_group or TaskGroup()
82 self.stop_conditions = stop_conditions or []

Callers 7

run_withMethod · 0.90
example_jobFunction · 0.90

Calls

no outgoing calls

Tested by 7

run_withMethod · 0.72
example_jobFunction · 0.72