MCPcopy Index your code
hub / github.com/google/adk-python / _get_all_sub_workflows

Method _get_all_sub_workflows

src/google/adk/cli/dev_server.py:1240–1275  ·  view source on GitHub ↗

Recursively discover all sub-workflows within the given app info. Args: app_info: Current app_info snippet or agent dict current_path: The accumulated string path (e.g., 'agent_a/workflow_b') Returns: A dictionary mapping the node path to the corresponding agent info dict

(
      self, app_info: dict, current_path: str = ""
  )

Source from the content-addressed store, hash-verified

1238 return current
1239
1240 def _get_all_sub_workflows(
1241 self, app_info: dict, current_path: str = ""
1242 ) -> dict[str, dict]:
1243 """Recursively discover all sub-workflows within the given app info.
1244
1245 Args:
1246 app_info: Current app_info snippet or agent dict
1247 current_path: The accumulated string path (e.g., 'agent_a/workflow_b')
1248
1249 Returns:
1250 A dictionary mapping the node path to the corresponding agent info dict.
1251 """
1252 workflows = {}
1253
1254 agent_info = app_info.get("root_agent", app_info)
1255 if agent_info.get("graph"):
1256 workflows[current_path] = agent_info
1257
1258 children = list(agent_info.get("sub_agents", []))
1259 children.extend(agent_info.get("nodes", []))
1260 graph = agent_info.get("graph")
1261 if graph:
1262 children.extend(graph.get("nodes", []))
1263
1264 for child in children:
1265 child_name = child.get("name")
1266 if not child_name:
1267 continue
1268 child_path = (
1269 f"{current_path}/{child_name}" if current_path else child_name
1270 )
1271 workflows.update(
1272 self._get_all_sub_workflows({"root_agent": child}, child_path)
1273 )
1274
1275 return workflows
1276
1277 def get_fast_api_app(self, **kwargs):
1278 """Override to add dev endpoints after production endpoints.

Callers 1

get_app_info_imageMethod · 0.95

Calls 3

getMethod · 0.45
extendMethod · 0.45
updateMethod · 0.45

Tested by

no test coverage detected