MCPcopy Index your code
hub / github.com/shidenggui/easyquant / load

Method load

custom/fixedmainengine.py:41–75  ·  view source on GitHub ↗
(self, names, strategy_file)

Source from the content-addressed store, hash-verified

39 self.quotation_engines.append(quotation_engine(self.event_engine, self.clock_engine, positions))
40
41 def load(self, names, strategy_file):
42 with self.lock:
43 mtime = os.path.getmtime(os.path.join('strategies', strategy_file))
44
45 # 是否需要重新加载
46 reload = False
47 strategy_module_name = os.path.basename(strategy_file)[:-3]
48 if self._cache.get(strategy_file, None) == mtime:
49 return
50 elif self._cache.get(strategy_file, None) is not None:
51 # 原有进程退出
52 _process = self._process_map.get(strategy_file)
53 self.unbind_event(_process)
54 _process.stop()
55 self.log.info(u'卸载策略: %s' % strategy_module_name)
56 time.sleep(2)
57 reload = True
58 # 重新加载
59 if reload:
60 strategy_module = importlib.reload(self._modules[strategy_file])
61 else:
62 strategy_module = importlib.import_module('.' + strategy_module_name, 'strategies')
63 self._modules[strategy_file] = strategy_module
64
65 strategy_class = getattr(strategy_module, 'Strategy')
66 if names is None or strategy_class.name in names:
67 self.strategies[strategy_module_name] = strategy_class
68 # 进程包装
69 _process = ProcessWrapper(strategy_class(self.user, log_handler=self.log, main_engine=self))
70 # 缓存加载信息
71 self._process_map[strategy_file] = _process
72 self.strategy_list.append(_process)
73 self._cache[strategy_file] = mtime
74 self.bind_event(_process)
75 self.log.info(u'加载策略: %s' % strategy_module_name)
76
77 def bind_event(self, strategy):
78 """

Callers 1

load_strategyMethod · 0.95

Calls 4

unbind_eventMethod · 0.95
stopMethod · 0.95
bind_eventMethod · 0.95
ProcessWrapperClass · 0.90

Tested by

no test coverage detected