MCPcopy
hub / github.com/zenml-io/zenml / execute

Method execute

src/zenml/zen_server/request_management.py:543–669  ·  view source on GitHub ↗

Execute a request with in-memory de-duplication. Call this method to execute a request with in-memory de-duplication. If the request (identified by the request_id) has previously been executed and the result is still cached, the result will be returned immediately. I

(
        self,
        func: Callable[..., Any],
        deduplicate: Optional[bool],
        *args: Any,
        **kwargs: Any,
    )

Source from the content-addressed store, hash-verified

541 request_record.set_result(result)
542
543 async def execute(
544 self,
545 func: Callable[..., Any],
546 deduplicate: Optional[bool],
547 *args: Any,
548 **kwargs: Any,
549 ) -> Any:
550 """Execute a request with in-memory de-duplication.
551
552 Call this method to execute a request with in-memory de-duplication.
553 If the request (identified by the request_id) has previously been
554 executed and the result is still cached, the result will be returned
555 immediately. If the request is in-flight, the method will wait for it
556 to complete. If the request is not in-flight, the method will start
557 execution in the background and cache the result.
558
559 Args:
560 func: The function to execute.
561 deduplicate: Whether to enable or disable request deduplication for
562 this request. If not specified, by default, the deduplication
563 is enabled for POST requests and disabled for other requests.
564 *args: The arguments to pass to the function.
565 **kwargs: The keyword arguments to pass to the function.
566
567 Returns:
568 The result of the request.
569 """
570 from zenml.zen_server.utils import get_system_metrics
571
572 request_context = self.current_request
573 assert request_context is not None
574
575 logger.debug(
576 "endpoint.started",
577 extra={
578 "endpoint": func.__name__,
579 **get_system_metrics(),
580 },
581 )
582
583 transaction_id = request_context.transaction_id
584
585 if deduplicate is None:
586 # If not specified, by default, the deduplication is enabled for
587 # POST requests and disabled for other requests.
588 deduplicate = request_context.request.method == "POST"
589
590 deduplicate = (
591 deduplicate and self.deduplicate and request_context.is_cacheable
592 )
593
594 async with self.lock:
595 if deduplicate and transaction_id in self.transactions:
596 # The transaction is still being processed on the same
597 # server instance. We just wait for it to complete.
598 fut = self.transactions[transaction_id].future
599 logger.debug(
600 "endpoint.resumed",

Calls 3

get_system_metricsFunction · 0.90
RequestRecordClass · 0.85