(self, query: str, params: Optional[QueryParam] = None, response_model = None)
| 144 | raise e |
| 145 | |
| 146 | def query(self, query: str, params: Optional[QueryParam] = None, response_model = None) -> TQueryResponse[GTNode, GTEdge, GTHash, GTChunk]: |
| 147 | async def _query() -> TQueryResponse[GTNode, GTEdge, GTHash, GTChunk]: |
| 148 | await self.state_manager.query_start() |
| 149 | try: |
| 150 | answer = await self.async_query(query, params, response_model) |
| 151 | return answer |
| 152 | except Exception as e: |
| 153 | logger.error(f"Error during query: {e}") |
| 154 | raise e |
| 155 | finally: |
| 156 | await self.state_manager.query_done() |
| 157 | |
| 158 | return get_event_loop().run_until_complete(_query()) |
| 159 | |
| 160 | async def async_query( |
| 161 | self, query: Optional[str], params: Optional[QueryParam] = None, response_model = None |
no test coverage detected