MCPcopy Index your code
hub / github.com/pathwaycom/pathway / _serialize_graph

Function _serialize_graph

python/pathway/internals/graph_runner/telemetry.py:163–235  ·  view source on GitHub ↗
(graph: ParseGraph)

Source from the content-addressed store, hash-verified

161
162
163def _serialize_graph(graph: ParseGraph) -> str:
164 stack: list[Operator] = list(graph.global_scope.output_nodes)
165 visited: set[Operator] = set(stack)
166 edges_set = set()
167
168 while stack:
169 node = stack.pop()
170 for dependency in node.input_operators():
171 if dependency in graph.global_scope._nodes:
172 edges_set.add((dependency, node))
173 if dependency not in visited:
174 visited.add(dependency)
175 stack.append(dependency)
176
177 nodes = []
178 edges = []
179 groups: dict[str, Any] = {}
180
181 for node in visited:
182 if node.trace.user_frame is None:
183 continue
184
185 user_frame = {
186 "user_frame_function": node.trace.user_frame.function,
187 "user_frame_filename": node.trace.user_frame.filename,
188 "user_frame_line": node.trace.user_frame.line,
189 "user_frame_line_number": node.trace.user_frame.line_number,
190 }
191
192 parent = f"{node.trace.user_frame.filename}:{node.trace.user_frame.line_number}"
193 grandparent = node.trace.user_frame.function
194
195 if grandparent not in groups:
196 groups[grandparent] = {
197 "id": f"g_{len(groups)}",
198 "level": 2,
199 **user_frame,
200 }
201
202 if parent not in groups:
203 groups[parent] = {
204 "id": f"g_{len(groups)}",
205 "level": 1,
206 "parent": groups[grandparent]["id"],
207 **user_frame,
208 }
209
210 nodes.append(
211 {
212 "id": str(node.id),
213 "parent": groups[parent]["id"],
214 "grand_parent": groups[grandparent]["id"],
215 "operator_type": node.operator_type(),
216 "level": 0,
217 **user_frame,
218 }
219 )
220

Callers 1

createMethod · 0.85

Calls 5

input_operatorsMethod · 0.80
valuesMethod · 0.80
addMethod · 0.45
operator_typeMethod · 0.45
dumpsMethod · 0.45

Tested by

no test coverage detected