MCPcopy Index your code
hub / github.com/AutoForgeAI/autoforge / run_agent_session

Function run_agent_session

agent.py:52–134  ·  view source on GitHub ↗

Run a single agent session using Claude Agent SDK. Args: client: Claude SDK client message: The prompt to send project_dir: Project directory path Returns: (status, response_text) where status is: - "continue" if agent should continue working

(
    client: ClaudeSDKClient,
    message: str,
    project_dir: Path,
)

Source from the content-addressed store, hash-verified

50
51
52async def run_agent_session(
53 client: ClaudeSDKClient,
54 message: str,
55 project_dir: Path,
56) -> tuple[str, str]:
57 """
58 Run a single agent session using Claude Agent SDK.
59
60 Args:
61 client: Claude SDK client
62 message: The prompt to send
63 project_dir: Project directory path
64
65 Returns:
66 (status, response_text) where status is:
67 - "continue" if agent should continue working
68 - "error" if an error occurred
69 """
70 print("Sending prompt to Claude Agent SDK...\n")
71
72 try:
73 # Send the query
74 await client.query(message)
75
76 # Collect response text and show tool use
77 response_text = ""
78 async for msg in client.receive_response():
79 msg_type = type(msg).__name__
80
81 # Handle AssistantMessage (text and tool use)
82 if msg_type == "AssistantMessage" and hasattr(msg, "content"):
83 for block in msg.content:
84 block_type = type(block).__name__
85
86 if block_type == "TextBlock" and hasattr(block, "text"):
87 response_text += block.text
88 print(block.text, end="", flush=True)
89 elif block_type == "ToolUseBlock" and hasattr(block, "name"):
90 print(f"\n[Tool: {block.name}]", flush=True)
91 if hasattr(block, "input"):
92 input_str = str(block.input)
93 if len(input_str) > 200:
94 print(f" Input: {input_str[:200]}...", flush=True)
95 else:
96 print(f" Input: {input_str}", flush=True)
97
98 # Handle UserMessage (tool results)
99 elif msg_type == "UserMessage" and hasattr(msg, "content"):
100 for block in msg.content:
101 block_type = type(block).__name__
102
103 if block_type == "ToolResultBlock":
104 result_content = getattr(block, "content", "")
105 is_error = getattr(block, "is_error", False)
106
107 # Check if command was blocked by security hook
108 if "blocked" in str(result_content).lower():
109 print(f" [BLOCKED] {result_content}", flush=True)

Callers 1

run_autonomous_agentFunction · 0.85

Calls 2

is_rate_limit_errorFunction · 0.90
parse_retry_afterFunction · 0.90

Tested by

no test coverage detected