A training engine hybrid pipeline, data, and model parallel training. This engine is created by ``deepspeed.initialize()`` when a :class:`PipelineModule` is provided.
| 58 | |
| 59 | |
| 60 | class PipelineEngine(DeepSpeedEngine): |
| 61 | """ A training engine hybrid pipeline, data, and model parallel training. |
| 62 | |
| 63 | This engine is created by ``deepspeed.initialize()`` when a :class:`PipelineModule` |
| 64 | is provided. |
| 65 | """ |
| 66 | ID_TO_DTYPE = [ |
| 67 | torch.float32, torch.float64, torch.complex64, torch.complex128, torch.float16, torch.bfloat16, torch.uint8, |
| 68 | torch.int8, torch.int16, torch.int32, torch.int64, torch.bool |
| 69 | ] |
| 70 | DTYPE_TO_ID = {dtype: id_ for id_, dtype in enumerate(ID_TO_DTYPE)} |
| 71 | |
| 72 | def __init__(self, has_bool_tensors=False, *super_args, **super_kwargs): |
| 73 | super().__init__(*super_args, **super_kwargs) |
| 74 | assert isinstance(self.module, PipelineModule), "model must base PipelineModule" |
| 75 | |
| 76 | assert self.zero_optimization_stage( |
| 77 | ) < ZeroStageEnum.gradients, "ZeRO-2 and ZeRO-3 are incompatible with pipeline parallelism" |
| 78 | |
| 79 | # We schedule the all-reduces, so disable it in super().backward() |
| 80 | self.enable_backward_allreduce = False |
| 81 | self.has_bool_tensors = has_bool_tensors |
| 82 | self.eval_return_logits = False |
| 83 | self.outputs = None |
| 84 | # BF16 Optimizer is hardcoded for fp32 gradient accumulation |
| 85 | self.using_bf16_optimizer = type(self.optimizer) == BF16_Optimizer |
| 86 | |
| 87 | # used to disable the pipeline all-reduce when used with 1-bit Adam/1-bit LAMB |
| 88 | self.pipeline_enable_backward_allreduce = True |
| 89 | |
| 90 | if self.elasticity_enabled(): |
| 91 | if not self.is_elastic_model_parallel_supported(): |
| 92 | assert not self.elasticity_enabled(), "Elasticity is not currently supported" \ |
| 93 | " with pipeline parallelism." |
| 94 | |
| 95 | # pipeline step for logging |
| 96 | self.log_batch_step_id = -1 |
| 97 | |
| 98 | self.micro_batch_size = self.train_micro_batch_size_per_gpu() |
| 99 | self.micro_batches = self.gradient_accumulation_steps() |
| 100 | |
| 101 | # Set Grid and Communication Groups |
| 102 | self.grid = self.module._grid |
| 103 | if self.grid.get_global_rank() == 0: |
| 104 | logger.info(f'CONFIG: micro_batches={self.micro_batches} ' |
| 105 | f'micro_batch_size={self.micro_batch_size}') |
| 106 | |
| 107 | self.global_rank = self.grid.get_global_rank() |
| 108 | |
| 109 | assert self.dp_world_size == self.grid.data_parallel_size |
| 110 | assert self.train_batch_size() == \ |
| 111 | self.micro_batch_size * self.micro_batches * self.grid.data_parallel_size |
| 112 | |
| 113 | # Set Stage Inf |
| 114 | self.num_stages = self.grid.pipe_parallel_size |
| 115 | self.stage_id = self.grid.get_stage_id() |
| 116 | self.prev_stage = self.stage_id - 1 |
| 117 | self.next_stage = self.stage_id + 1 |
no outgoing calls
no test coverage detected
searching dependent graphs…