MCPcopy
hub / github.com/stanford-oval/storm / _event_end

Method _event_end

knowledge_storm/logging_wrapper.py:116–141  ·  view source on GitHub ↗
(self, event_name: str)

Source from the content-addressed store, hash-verified

114 )
115
116 def _event_end(self, event_name: str):
117 if not self.pipeline_stage_active:
118 raise RuntimeError("No pipeline stage is currently active.")
119
120 if not self.event_stack:
121 raise RuntimeError("No parent event is currently active.")
122
123 if self.event_stack:
124 current_event_log = self.event_stack[-1]
125 if event_name in current_event_log.get_child_events():
126 current_event_log.get_child_events()[event_name].record_end_time()
127 elif (
128 event_name
129 in self.logging_dict[self.current_pipeline_stage]["time_usage"]
130 ):
131 self.logging_dict[self.current_pipeline_stage]["time_usage"][
132 event_name
133 ].record_end_time()
134 else:
135 raise AssertionError(
136 f"Failure to record end time for event {event_name}. Start time is not recorded."
137 )
138 if current_event_log.event_name == event_name:
139 self.event_stack.pop()
140 else:
141 raise RuntimeError("Cannot end an event without an active parent event.")
142
143 def _pipeline_stage_end(self):
144 if not self.pipeline_stage_active:

Callers 1

log_eventMethod · 0.95

Calls 2

get_child_eventsMethod · 0.80
record_end_timeMethod · 0.80

Tested by

no test coverage detected