(self, context: fl.ServerCallContext, ticket: fl.Ticket)
| 231 | @inject_user_details_decorator |
| 232 | @arrow_server_error_handling_decorator |
| 233 | def do_get(self, context: fl.ServerCallContext, ticket: fl.Ticket): |
| 234 | key = ast.literal_eval(ticket.ticket.decode()) |
| 235 | if key not in self.flights: |
| 236 | logger.error(f"Unknown key {key}") |
| 237 | return None |
| 238 | |
| 239 | command = json.loads(key[1]) |
| 240 | |
| 241 | self._validate_do_get_parameters(command) |
| 242 | |
| 243 | api = command["api"] |
| 244 | logger.debug(f"get command is {command}") |
| 245 | logger.debug(f"requested api is {api}") |
| 246 | try: |
| 247 | if api == OfflineServer.get_historical_features.__name__: |
| 248 | table = self.get_historical_features(command, key).to_arrow() |
| 249 | elif api == OfflineServer.pull_all_from_table_or_query.__name__: |
| 250 | table = self.pull_all_from_table_or_query(command).to_arrow() |
| 251 | elif api == OfflineServer.pull_latest_from_table_or_query.__name__: |
| 252 | table = self.pull_latest_from_table_or_query(command).to_arrow() |
| 253 | elif ( |
| 254 | api |
| 255 | == OfflineServer.get_table_column_names_and_types_from_data_source.__name__ |
| 256 | ): |
| 257 | table = self.get_table_column_names_and_types_from_data_source(command) |
| 258 | else: |
| 259 | raise NotImplementedError |
| 260 | except Exception as e: |
| 261 | logger.exception(e) |
| 262 | traceback.print_exc() |
| 263 | raise e |
| 264 | |
| 265 | # Get service is consumed, so we clear the corresponding flight and data |
| 266 | del self.flights[key] |
| 267 | return fl.RecordBatchStream(table) |
| 268 | |
| 269 | def _validate_offline_write_batch_parameters(self, command: dict): |
| 270 | assert "feature_view_names" in command, ( |
nothing calls this directly
no test coverage detected