(self, command: dict, key: str)
| 487 | assert "allow_overwrite" in command, "allow_overwrite is mandatory" |
| 488 | |
| 489 | def persist(self, command: dict, key: str): |
| 490 | self._validate_persist_parameters(command) |
| 491 | |
| 492 | try: |
| 493 | retrieve_func = command["retrieve_func"] |
| 494 | if retrieve_func == OfflineServer.get_historical_features.__name__: |
| 495 | ret_job = self.get_historical_features(command, key) |
| 496 | elif ( |
| 497 | retrieve_func == OfflineServer.pull_latest_from_table_or_query.__name__ |
| 498 | ): |
| 499 | ret_job = self.pull_latest_from_table_or_query(command) |
| 500 | elif retrieve_func == OfflineServer.pull_all_from_table_or_query.__name__: |
| 501 | ret_job = self.pull_all_from_table_or_query(command) |
| 502 | else: |
| 503 | raise NotImplementedError |
| 504 | |
| 505 | data_source = self.store.get_data_source(command["data_source_name"]) |
| 506 | assert_permissions( |
| 507 | resource=data_source, |
| 508 | actions=[AuthzedAction.WRITE_OFFLINE], |
| 509 | ) |
| 510 | storage = SavedDatasetStorage.from_data_source(data_source) |
| 511 | ret_job.persist(storage, command["allow_overwrite"], command["timeout"]) |
| 512 | except Exception as e: |
| 513 | logger.exception(e) |
| 514 | traceback.print_exc() |
| 515 | raise e |
| 516 | |
| 517 | @staticmethod |
| 518 | def _extract_data_source_from_command(command) -> DataSource: |
no test coverage detected