Engine to support async events.
| 30 | |
| 31 | |
| 32 | class Engine: |
| 33 | """Engine to support async events.""" |
| 34 | |
| 35 | def __init__(self, exchange: Exchange, description: str = "", *, verbose: bool = False) -> None: |
| 36 | self.events: list[Event] = [] |
| 37 | self._num_event = 0 |
| 38 | self.verbose = verbose |
| 39 | self.exchange: Exchange = exchange |
| 40 | self.description = description |
| 41 | self.agents: dict[int, BaseAgent] = {} |
| 42 | self.order_owner: dict[tuple[str, int], int] = {} # symbol, order_id -> agent_id |
| 43 | |
| 44 | def has_event(self) -> bool: |
| 45 | """Has any event.""" |
| 46 | return len(self.events) > 0 |
| 47 | |
| 48 | def register_agent(self, agent: BaseAgent) -> None: |
| 49 | """Register agent.""" |
| 50 | num_agent = len(self.agents) |
| 51 | agent.agent_id = num_agent |
| 52 | self.agents[agent.agent_id] = agent |
| 53 | |
| 54 | def push_event(self, event: Event) -> None: |
| 55 | """Push event.""" |
| 56 | max_event_id = 1000000000 |
| 57 | if isinstance(event, MarketCloseEvent): |
| 58 | event.event_id = max_event_id |
| 59 | else: |
| 60 | event.event_id = self._num_event |
| 61 | self._num_event += 1 |
| 62 | assert event.event_id < max_event_id |
| 63 | heapq.heappush(self.events, event) |
| 64 | self._log(f"received new event, type: {event.__class__.__name__}, {event.time}") |
| 65 | |
| 66 | def push_events(self, events: list[Event]) -> None: |
| 67 | """Push events.""" |
| 68 | for event in events: |
| 69 | self.push_event(event) |
| 70 | |
| 71 | def run(self) -> None: |
| 72 | """Run event engine.""" |
| 73 | start_time = Timestamp.now() |
| 74 | time_progress: TimeProgress = TimeProgress( |
| 75 | self.exchange.config.open_time, self.exchange.config.close_time, description=self.description, unit="s" |
| 76 | ) |
| 77 | with time_progress.progress: |
| 78 | while self.events: |
| 79 | event = self._pop_event() |
| 80 | self._handle_event(event) |
| 81 | time_progress.update(event.time) |
| 82 | self._log(f"finished processing {self._num_event} events in {(Timestamp.now() - start_time).total_seconds()}s.") |
| 83 | |
| 84 | def _pop_event(self) -> Event: |
| 85 | """Pop event.""" |
| 86 | assert self.events |
| 87 | event: Event = heapq.heappop(self.events) |
| 88 | self._log(f"handling event, type: {event.__class__.__name__}, {event.time}") |
| 89 | return event |
no outgoing calls