动态加载策略 :param names: 策略名列表,元素为策略的 name 属性
(self, names=None)
| 91 | self.event_engine.unregister(ClockEngine.EventType, strategy.on_clock) |
| 92 | |
| 93 | def load_strategy(self, names=None): |
| 94 | """动态加载策略 |
| 95 | :param names: 策略名列表,元素为策略的 name 属性""" |
| 96 | s_folder = 'strategies' |
| 97 | self._names = names |
| 98 | strategies = os.listdir(s_folder) |
| 99 | strategies = filter(lambda file: file.endswith('.py') and file != '__init__.py', strategies) |
| 100 | importlib.import_module(s_folder) |
| 101 | for strategy_file in strategies: |
| 102 | self.load(self._names, strategy_file) |
| 103 | # 如果线程没有启动,就启动策略监视线程 |
| 104 | if not self._watch_thread.is_alive(): |
| 105 | self._watch_thread.start() |
| 106 | |
| 107 | def _load_strategy(self): |
| 108 | while True: |
no test coverage detected