(self, event_name: str)
| 76 | self.pipeline_stage_active = True |
| 77 | |
| 78 | def _event_start(self, event_name: str): |
| 79 | if not self.pipeline_stage_active: |
| 80 | raise RuntimeError("No pipeline stage is currently active.") |
| 81 | |
| 82 | if not self.event_stack and self.current_pipeline_stage: |
| 83 | # Top-level event (directly under the pipeline stage) |
| 84 | if ( |
| 85 | event_name |
| 86 | not in self.logging_dict[self.current_pipeline_stage]["time_usage"] |
| 87 | ): |
| 88 | event = EventLog(event_name=event_name) |
| 89 | event.record_start_time() |
| 90 | self.logging_dict[self.current_pipeline_stage]["time_usage"][ |
| 91 | event_name |
| 92 | ] = event |
| 93 | self.event_stack.append(event) |
| 94 | else: |
| 95 | self.logging_dict[self.current_pipeline_stage]["time_usage"][ |
| 96 | event_name |
| 97 | ].record_start_time() |
| 98 | elif self.event_stack: |
| 99 | # Nested event (under another event) |
| 100 | parent_event = self.event_stack[-1] |
| 101 | if event_name not in parent_event.get_child_events(): |
| 102 | event = EventLog(event_name=event_name) |
| 103 | event.record_start_time() |
| 104 | parent_event.add_child_event(event) |
| 105 | self.logging_dict[self.current_pipeline_stage]["time_usage"][ |
| 106 | event_name |
| 107 | ] = event |
| 108 | self.event_stack.append(event) |
| 109 | else: |
| 110 | parent_event.get_child_events()[event_name].record_start_time() |
| 111 | else: |
| 112 | raise RuntimeError( |
| 113 | "Cannot start an event without an active pipeline stage or parent event." |
| 114 | ) |
| 115 | |
| 116 | def _event_end(self, event_name: str): |
| 117 | if not self.pipeline_stage_active: |
no test coverage detected