Execute a request with in-memory de-duplication. Call this method to execute a request with in-memory de-duplication. If the request (identified by the request_id) has previously been executed and the result is still cached, the result will be returned immediately. I
(
self,
func: Callable[..., Any],
deduplicate: Optional[bool],
*args: Any,
**kwargs: Any,
)
| 541 | request_record.set_result(result) |
| 542 | |
| 543 | async def execute( |
| 544 | self, |
| 545 | func: Callable[..., Any], |
| 546 | deduplicate: Optional[bool], |
| 547 | *args: Any, |
| 548 | **kwargs: Any, |
| 549 | ) -> Any: |
| 550 | """Execute a request with in-memory de-duplication. |
| 551 | |
| 552 | Call this method to execute a request with in-memory de-duplication. |
| 553 | If the request (identified by the request_id) has previously been |
| 554 | executed and the result is still cached, the result will be returned |
| 555 | immediately. If the request is in-flight, the method will wait for it |
| 556 | to complete. If the request is not in-flight, the method will start |
| 557 | execution in the background and cache the result. |
| 558 | |
| 559 | Args: |
| 560 | func: The function to execute. |
| 561 | deduplicate: Whether to enable or disable request deduplication for |
| 562 | this request. If not specified, by default, the deduplication |
| 563 | is enabled for POST requests and disabled for other requests. |
| 564 | *args: The arguments to pass to the function. |
| 565 | **kwargs: The keyword arguments to pass to the function. |
| 566 | |
| 567 | Returns: |
| 568 | The result of the request. |
| 569 | """ |
| 570 | from zenml.zen_server.utils import get_system_metrics |
| 571 | |
| 572 | request_context = self.current_request |
| 573 | assert request_context is not None |
| 574 | |
| 575 | logger.debug( |
| 576 | "endpoint.started", |
| 577 | extra={ |
| 578 | "endpoint": func.__name__, |
| 579 | **get_system_metrics(), |
| 580 | }, |
| 581 | ) |
| 582 | |
| 583 | transaction_id = request_context.transaction_id |
| 584 | |
| 585 | if deduplicate is None: |
| 586 | # If not specified, by default, the deduplication is enabled for |
| 587 | # POST requests and disabled for other requests. |
| 588 | deduplicate = request_context.request.method == "POST" |
| 589 | |
| 590 | deduplicate = ( |
| 591 | deduplicate and self.deduplicate and request_context.is_cacheable |
| 592 | ) |
| 593 | |
| 594 | async with self.lock: |
| 595 | if deduplicate and transaction_id in self.transactions: |
| 596 | # The transaction is still being processed on the same |
| 597 | # server instance. We just wait for it to complete. |
| 598 | fut = self.transactions[transaction_id].future |
| 599 | logger.debug( |
| 600 | "endpoint.resumed", |