Process price data from FMP API. :return:
(self, save_dir: str)
| 120 | return await task.run() |
| 121 | |
| 122 | async def _process(self, save_dir: str): |
| 123 | """ |
| 124 | Process price data from FMP API. |
| 125 | :return: |
| 126 | """ |
| 127 | info = { |
| 128 | "source": self.source, |
| 129 | "data_type": self.data_type, |
| 130 | } |
| 131 | asset_info = [] |
| 132 | os.makedirs(save_dir, exist_ok=True) |
| 133 | |
| 134 | tasks = [] |
| 135 | for symbol in self.symbols: |
| 136 | symbol_info = self._get_sybmol_info(symbol) |
| 137 | |
| 138 | assert self.source in self.processor_map, f"Source {self.source} not supported" |
| 139 | source_processors = self.processor_map[self.source] |
| 140 | assert self.data_type in source_processors, f"Data type {self.data_type} not supported for source {self.source}" |
| 141 | class_ = source_processors[self.data_type] |
| 142 | |
| 143 | processor = class_( |
| 144 | data_path=self.data_path, |
| 145 | start_date=self.start_date, |
| 146 | end_date=self.end_date, |
| 147 | level=self.level, |
| 148 | format=self.format, |
| 149 | max_concurrent=self.max_concurrent, |
| 150 | symbol_info=symbol_info, |
| 151 | workdir=save_dir, |
| 152 | feature_type=self.feature_type |
| 153 | ) |
| 154 | tasks.append(processor) |
| 155 | |
| 156 | for i in range(0, len(tasks), self.max_concurrent): |
| 157 | batch = tasks[i:min(i + self.max_concurrent, len(tasks))] |
| 158 | batch_info = await asyncio.gather(*[self.run_task(task) for task in batch]) |
| 159 | asset_info.extend(batch_info) |
| 160 | |
| 161 | dates = { |
| 162 | item["symbol"]: { |
| 163 | "start_date": item.get("start_date"), |
| 164 | "end_date": item.get("end_date") |
| 165 | } for item in asset_info |
| 166 | } |
| 167 | info["dates"] = dates |
| 168 | info["names"] = asset_info[0].get("names", []) |
| 169 | |
| 170 | return info |
| 171 | |
| 172 | async def run(self): |
| 173 | logger.info(f"| Processing {self.data_type} data from {self.source}...") |