(self, event_name: str)
| 114 | ) |
| 115 | |
| 116 | def _event_end(self, event_name: str): |
| 117 | if not self.pipeline_stage_active: |
| 118 | raise RuntimeError("No pipeline stage is currently active.") |
| 119 | |
| 120 | if not self.event_stack: |
| 121 | raise RuntimeError("No parent event is currently active.") |
| 122 | |
| 123 | if self.event_stack: |
| 124 | current_event_log = self.event_stack[-1] |
| 125 | if event_name in current_event_log.get_child_events(): |
| 126 | current_event_log.get_child_events()[event_name].record_end_time() |
| 127 | elif ( |
| 128 | event_name |
| 129 | in self.logging_dict[self.current_pipeline_stage]["time_usage"] |
| 130 | ): |
| 131 | self.logging_dict[self.current_pipeline_stage]["time_usage"][ |
| 132 | event_name |
| 133 | ].record_end_time() |
| 134 | else: |
| 135 | raise AssertionError( |
| 136 | f"Failure to record end time for event {event_name}. Start time is not recorded." |
| 137 | ) |
| 138 | if current_event_log.event_name == event_name: |
| 139 | self.event_stack.pop() |
| 140 | else: |
| 141 | raise RuntimeError("Cannot end an event without an active parent event.") |
| 142 | |
| 143 | def _pipeline_stage_end(self): |
| 144 | if not self.pipeline_stage_active: |
no test coverage detected