动态加载策略 :param names: 策略名列表,元素为策略的 name 属性
(self, names=None)
| 178 | func(ClockEngine.EventType, strategy.clock) |
| 179 | |
| 180 | def load_strategy(self, names=None): |
| 181 | """动态加载策略 |
| 182 | :param names: 策略名列表,元素为策略的 name 属性""" |
| 183 | s_folder = 'strategies' |
| 184 | self._names = names |
| 185 | strategies = os.listdir(s_folder) |
| 186 | strategies = filter(lambda file: file.endswith('.py') and file != '__init__.py', strategies) |
| 187 | importlib.import_module(s_folder) |
| 188 | for strategy_file in strategies: |
| 189 | self.load(self._names, strategy_file) |
| 190 | # 如果线程没有启动,就启动策略监视线程 |
| 191 | if self.is_watch_strategy and not self._watch_thread.is_alive(): |
| 192 | self.log.warn("启用了动态加载策略功能") |
| 193 | self._watch_thread.start() |
| 194 | |
| 195 | def _load_strategy(self): |
| 196 | while True: |
no test coverage detected