MCPcopy
hub / github.com/ModelEngine-Group/nexent / process_sync

Function process_sync

backend/data_process/tasks.py:2146–2260  ·  view source on GitHub ↗

Synchronous process task that returns text directly (for real-time API) Args: source: Source file path, URL, or text content source_type: source of the file("local", "minio") chunking_strategy: Strategy for chunking the document timeout: Timeout for the oper

(
        self,
        source: str,
        source_type: str,
        chunking_strategy: str = "basic",
        timeout: int = 30,
        **params
)

Source from the content-addressed store, hash-verified

2144
2145@app.task(bind=True, base=LoggingTask, name='data_process.tasks.process_sync')
2146def process_sync(
2147 self,
2148 source: str,
2149 source_type: str,
2150 chunking_strategy: str = "basic",
2151 timeout: int = 30,
2152 **params
2153) -> Dict:
2154 """
2155 Synchronous process task that returns text directly (for real-time API)
2156
2157 Args:
2158 source: Source file path, URL, or text content
2159 source_type: source of the file("local", "minio")
2160 chunking_strategy: Strategy for chunking the document
2161 timeout: Timeout for the operation
2162 **params: Additional parameters
2163
2164 Returns:
2165 Dict containing the extracted text and metadata
2166 """
2167 start_time = time.time()
2168 task_id = self.request.id
2169
2170 # Check if we're in a valid Celery context before updating state
2171 is_celery_context = hasattr(
2172 self, 'request') and self.request.id is not None
2173
2174 # Update task state to PROCESSING only if in Celery context
2175 if is_celery_context:
2176 self.update_state(
2177 state=states.STARTED,
2178 meta={
2179 'source': source,
2180 'source_type': source_type,
2181 'task_name': 'process_sync',
2182 'start_time': start_time,
2183 'sync_mode': True
2184 }
2185 )
2186
2187 logger.info(
2188 f"Synchronous processing file: {source} with strategy: {chunking_strategy}")
2189
2190 # Get the data processor instance
2191 actor = get_ray_actor()
2192
2193 try:
2194 # Process the file based on the source type
2195 if source_type == "local":
2196 # The unified actor call, mapping 'file' source_type to 'local' destination
2197 chunks_ref = actor.process_file.remote(
2198 source,
2199 chunking_strategy,
2200 destination=source_type,
2201 task_id=task_id,
2202 **params
2203 )

Callers

nothing calls this directly

Calls 6

get_ray_actorFunction · 0.85
update_stateMethod · 0.80
errorMethod · 0.80
remoteMethod · 0.45
getMethod · 0.45
joinMethod · 0.45

Tested by

no test coverage detected