MCPcopy
hub / github.com/stitionai/devika / ProjectManager

Class ProjectManager

src/project.py:17–175  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

15
16
17class ProjectManager:
18 def __init__(self):
19 config = Config()
20 sqlite_path = config.get_sqlite_db()
21 self.project_path = config.get_projects_dir()
22 self.engine = create_engine(f"sqlite:///{sqlite_path}")
23 SQLModel.metadata.create_all(self.engine)
24
25 def new_message(self):
26 timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
27
28 return {
29 "from_devika": True,
30 "message": None,
31 "timestamp": timestamp
32 }
33
34 def create_project(self, project: str):
35 with Session(self.engine) as session:
36 project_state = Projects(project=project, message_stack_json=json.dumps([]))
37 session.add(project_state)
38 session.commit()
39
40 def delete_project(self, project: str):
41 with Session(self.engine) as session:
42 project_state = session.query(Projects).filter(Projects.project == project).first()
43 if project_state:
44 session.delete(project_state)
45 session.commit()
46
47 def add_message_to_project(self, project: str, message: dict):
48 with Session(self.engine) as session:
49 project_state = session.query(Projects).filter(Projects.project == project).first()
50 if project_state:
51 message_stack = json.loads(project_state.message_stack_json)
52 message_stack.append(message)
53 project_state.message_stack_json = json.dumps(message_stack)
54 session.commit()
55 else:
56 message_stack = [message]
57 project_state = Projects(project=project, message_stack_json=json.dumps(message_stack))
58 session.add(project_state)
59 session.commit()
60
61 def add_message_from_devika(self, project: str, message: str):
62 new_message = self.new_message()
63 new_message["message"] = message
64 emit_agent("server-message", {"messages": new_message})
65 self.add_message_to_project(project, new_message)
66
67 def add_message_from_user(self, project: str, message: str):
68 new_message = self.new_message()
69 new_message["message"] = message
70 new_message["from_devika"] = False
71 emit_agent("server-message", {"messages": new_message})
72 self.add_message_to_project(project, new_message)
73
74 def get_messages(self, project: str):

Callers 5

devika.pyFile · 0.90
deployMethod · 0.90
project.pyFile · 0.90
__init__Method · 0.90
run_codeMethod · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected