MCPcopy
hub / github.com/stanford-oval/storm / _event_start

Method _event_start

knowledge_storm/logging_wrapper.py:78–114  ·  view source on GitHub ↗
(self, event_name: str)

Source from the content-addressed store, hash-verified

76 self.pipeline_stage_active = True
77
78 def _event_start(self, event_name: str):
79 if not self.pipeline_stage_active:
80 raise RuntimeError("No pipeline stage is currently active.")
81
82 if not self.event_stack and self.current_pipeline_stage:
83 # Top-level event (directly under the pipeline stage)
84 if (
85 event_name
86 not in self.logging_dict[self.current_pipeline_stage]["time_usage"]
87 ):
88 event = EventLog(event_name=event_name)
89 event.record_start_time()
90 self.logging_dict[self.current_pipeline_stage]["time_usage"][
91 event_name
92 ] = event
93 self.event_stack.append(event)
94 else:
95 self.logging_dict[self.current_pipeline_stage]["time_usage"][
96 event_name
97 ].record_start_time()
98 elif self.event_stack:
99 # Nested event (under another event)
100 parent_event = self.event_stack[-1]
101 if event_name not in parent_event.get_child_events():
102 event = EventLog(event_name=event_name)
103 event.record_start_time()
104 parent_event.add_child_event(event)
105 self.logging_dict[self.current_pipeline_stage]["time_usage"][
106 event_name
107 ] = event
108 self.event_stack.append(event)
109 else:
110 parent_event.get_child_events()[event_name].record_start_time()
111 else:
112 raise RuntimeError(
113 "Cannot start an event without an active pipeline stage or parent event."
114 )
115
116 def _event_end(self, event_name: str):
117 if not self.pipeline_stage_active:

Callers 1

log_eventMethod · 0.95

Calls 4

record_start_timeMethod · 0.95
EventLogClass · 0.85
get_child_eventsMethod · 0.80
add_child_eventMethod · 0.80

Tested by

no test coverage detected