MCPcopy Index your code
hub / github.com/microsoft/MarS / Engine

Class Engine

mlib/core/engine.py:32–347  ·  view source on GitHub ↗

Engine to support async events.

Source from the content-addressed store, hash-verified

30
31
32class Engine:
33 """Engine to support async events."""
34
35 def __init__(self, exchange: Exchange, description: str = "", *, verbose: bool = False) -> None:
36 self.events: list[Event] = []
37 self._num_event = 0
38 self.verbose = verbose
39 self.exchange: Exchange = exchange
40 self.description = description
41 self.agents: dict[int, BaseAgent] = {}
42 self.order_owner: dict[tuple[str, int], int] = {} # symbol, order_id -> agent_id
43
44 def has_event(self) -> bool:
45 """Has any event."""
46 return len(self.events) > 0
47
48 def register_agent(self, agent: BaseAgent) -> None:
49 """Register agent."""
50 num_agent = len(self.agents)
51 agent.agent_id = num_agent
52 self.agents[agent.agent_id] = agent
53
54 def push_event(self, event: Event) -> None:
55 """Push event."""
56 max_event_id = 1000000000
57 if isinstance(event, MarketCloseEvent):
58 event.event_id = max_event_id
59 else:
60 event.event_id = self._num_event
61 self._num_event += 1
62 assert event.event_id < max_event_id
63 heapq.heappush(self.events, event)
64 self._log(f"received new event, type: {event.__class__.__name__}, {event.time}")
65
66 def push_events(self, events: list[Event]) -> None:
67 """Push events."""
68 for event in events:
69 self.push_event(event)
70
71 def run(self) -> None:
72 """Run event engine."""
73 start_time = Timestamp.now()
74 time_progress: TimeProgress = TimeProgress(
75 self.exchange.config.open_time, self.exchange.config.close_time, description=self.description, unit="s"
76 )
77 with time_progress.progress:
78 while self.events:
79 event = self._pop_event()
80 self._handle_event(event)
81 time_progress.update(event.time)
82 self._log(f"finished processing {self._num_event} events in {(Timestamp.now() - start_time).total_seconds()}s.")
83
84 def _pop_event(self) -> Event:
85 """Pop event."""
86 assert self.events
87 event: Event = heapq.heappop(self.events)
88 self._log(f"handling event, type: {event.__class__.__name__}, {event.time}")
89 return event

Callers 3

test_priority_queueFunction · 0.90
test_engine_runFunction · 0.90

Calls

no outgoing calls

Tested by 3

test_priority_queueFunction · 0.72
test_engine_runFunction · 0.72