MCPcopy Index your code
hub / github.com/agent0ai/agent-zero / AgentContext

Class AgentContext

agent.py:51–316  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

49
50
51class AgentContext:
52
53 _contexts: dict[str, "AgentContext"] = {}
54 _contexts_lock = threading.RLock()
55 _counter: int = 0
56 _notification_manager = None
57
58 @extension.extensible
59 def __init__(
60 self,
61 config: "AgentConfig",
62 id: str | None = None,
63 name: str | None = None,
64 agent0: "Agent|None" = None,
65 log: Log.Log | None = None,
66 paused: bool = False,
67 streaming_agent: "Agent|None" = None,
68 created_at: datetime | None = None,
69 type: AgentContextType = AgentContextType.USER,
70 last_message: datetime | None = None,
71 data: dict | None = None,
72 output_data: dict | None = None,
73 set_current: bool = False,
74 ):
75 # initialize context
76 self.id = id or AgentContext.generate_id()
77 existing = None
78 with AgentContext._contexts_lock:
79 existing = AgentContext._contexts.get(self.id, None)
80 if existing:
81 AgentContext._contexts.pop(self.id, None)
82 AgentContext._contexts[self.id] = self
83 if existing and existing.task:
84 existing.task.kill()
85 if set_current:
86 AgentContext.set_current(self.id)
87
88 # initialize state
89 self.name = name
90 self.config = config
91 self.data = data or {}
92 self.output_data = output_data or {}
93 self.log = log or Log.Log()
94 self.log.context = self
95 self.paused = paused
96 self.streaming_agent = streaming_agent
97 self.task: DeferredTask | None = None
98 self.created_at = created_at or Localization.get().now()
99 self.type = type
100 AgentContext._counter += 1
101 self.no = AgentContext._counter
102 self.last_message = last_message or Localization.get().now()
103
104 # initialize agent at last (context is complete now)
105 self.agent0 = agent0 or Agent(0, self.config, self)
106
107 @staticmethod
108 def get(id: str):

Calls

no outgoing calls