MCPcopy
hub / github.com/deepspeedai/DeepSpeed / PipelineEngine

Class PipelineEngine

deepspeed/runtime/pipe/engine.py:60–1381  ·  view source on GitHub ↗

A training engine hybrid pipeline, data, and model parallel training. This engine is created by ``deepspeed.initialize()`` when a :class:`PipelineModule` is provided.

Source from the content-addressed store, hash-verified

58
59
60class PipelineEngine(DeepSpeedEngine):
61 """ A training engine hybrid pipeline, data, and model parallel training.
62
63 This engine is created by ``deepspeed.initialize()`` when a :class:`PipelineModule`
64 is provided.
65 """
66 ID_TO_DTYPE = [
67 torch.float32, torch.float64, torch.complex64, torch.complex128, torch.float16, torch.bfloat16, torch.uint8,
68 torch.int8, torch.int16, torch.int32, torch.int64, torch.bool
69 ]
70 DTYPE_TO_ID = {dtype: id_ for id_, dtype in enumerate(ID_TO_DTYPE)}
71
72 def __init__(self, has_bool_tensors=False, *super_args, **super_kwargs):
73 super().__init__(*super_args, **super_kwargs)
74 assert isinstance(self.module, PipelineModule), "model must base PipelineModule"
75
76 assert self.zero_optimization_stage(
77 ) < ZeroStageEnum.gradients, "ZeRO-2 and ZeRO-3 are incompatible with pipeline parallelism"
78
79 # We schedule the all-reduces, so disable it in super().backward()
80 self.enable_backward_allreduce = False
81 self.has_bool_tensors = has_bool_tensors
82 self.eval_return_logits = False
83 self.outputs = None
84 # BF16 Optimizer is hardcoded for fp32 gradient accumulation
85 self.using_bf16_optimizer = type(self.optimizer) == BF16_Optimizer
86
87 # used to disable the pipeline all-reduce when used with 1-bit Adam/1-bit LAMB
88 self.pipeline_enable_backward_allreduce = True
89
90 if self.elasticity_enabled():
91 if not self.is_elastic_model_parallel_supported():
92 assert not self.elasticity_enabled(), "Elasticity is not currently supported" \
93 " with pipeline parallelism."
94
95 # pipeline step for logging
96 self.log_batch_step_id = -1
97
98 self.micro_batch_size = self.train_micro_batch_size_per_gpu()
99 self.micro_batches = self.gradient_accumulation_steps()
100
101 # Set Grid and Communication Groups
102 self.grid = self.module._grid
103 if self.grid.get_global_rank() == 0:
104 logger.info(f'CONFIG: micro_batches={self.micro_batches} '
105 f'micro_batch_size={self.micro_batch_size}')
106
107 self.global_rank = self.grid.get_global_rank()
108
109 assert self.dp_world_size == self.grid.data_parallel_size
110 assert self.train_batch_size() == \
111 self.micro_batch_size * self.micro_batches * self.grid.data_parallel_size
112
113 # Set Stage Inf
114 self.num_stages = self.grid.pipe_parallel_size
115 self.stage_id = self.grid.get_stage_id()
116 self.prev_stage = self.stage_id - 1
117 self.next_stage = self.stage_id + 1

Callers 1

initializeFunction · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…