Upsert an edge and its properties between two nodes identified by their names. Args: source_node_id (str): Name of the source node (used as identifier) target_node_id (str): Name of the target node (used as identifier) edge_data (dict): Dictionar
(
self, source_node_id: str, target_node_id: str, edge_data: Dict[str, Any]
)
| 392 | retry=retry_if_exception_type((GremlinServerError,)), |
| 393 | ) |
| 394 | async def upsert_edge( |
| 395 | self, source_node_id: str, target_node_id: str, edge_data: Dict[str, Any] |
| 396 | ): |
| 397 | """ |
| 398 | Upsert an edge and its properties between two nodes identified by their names. |
| 399 | |
| 400 | Args: |
| 401 | source_node_id (str): Name of the source node (used as identifier) |
| 402 | target_node_id (str): Name of the target node (used as identifier) |
| 403 | edge_data (dict): Dictionary of properties to set on the edge |
| 404 | """ |
| 405 | source_node_name = GremlinStorage._fix_name(source_node_id) |
| 406 | target_node_name = GremlinStorage._fix_name(target_node_id) |
| 407 | edge_properties = GremlinStorage._convert_properties(edge_data) |
| 408 | |
| 409 | query = f"""g |
| 410 | .V().has('graph', {self.graph_name}) |
| 411 | .has('entity_name', {source_node_name}).as('source') |
| 412 | .V().has('graph', {self.graph_name}) |
| 413 | .has('entity_name', {target_node_name}).as('target') |
| 414 | .coalesce( |
| 415 | __.select('source').outE('DIRECTED').where(__.inV().as('target')), |
| 416 | __.select('source').addE('DIRECTED').to(__.select('target')) |
| 417 | ) |
| 418 | .property('graph', {self.graph_name}) |
| 419 | {edge_properties} |
| 420 | """ |
| 421 | try: |
| 422 | await self._query(query) |
| 423 | logger.debug( |
| 424 | "Upserted edge from {%s} to {%s} with properties: {%s}", |
| 425 | source_node_name, |
| 426 | target_node_name, |
| 427 | edge_properties, |
| 428 | ) |
| 429 | except Exception as e: |
| 430 | logger.error("Error during edge upsert: {%s}", e) |
| 431 | raise |
| 432 | |
| 433 | async def _node2vec_embed(self): |
| 434 | print("Implemented but never called.") |
nothing calls this directly
no test coverage detected