Synchronous process task that returns text directly (for real-time API) Args: source: Source file path, URL, or text content source_type: source of the file("local", "minio") chunking_strategy: Strategy for chunking the document timeout: Timeout for the oper
(
self,
source: str,
source_type: str,
chunking_strategy: str = "basic",
timeout: int = 30,
**params
)
| 2144 | |
| 2145 | @app.task(bind=True, base=LoggingTask, name='data_process.tasks.process_sync') |
| 2146 | def process_sync( |
| 2147 | self, |
| 2148 | source: str, |
| 2149 | source_type: str, |
| 2150 | chunking_strategy: str = "basic", |
| 2151 | timeout: int = 30, |
| 2152 | **params |
| 2153 | ) -> Dict: |
| 2154 | """ |
| 2155 | Synchronous process task that returns text directly (for real-time API) |
| 2156 | |
| 2157 | Args: |
| 2158 | source: Source file path, URL, or text content |
| 2159 | source_type: source of the file("local", "minio") |
| 2160 | chunking_strategy: Strategy for chunking the document |
| 2161 | timeout: Timeout for the operation |
| 2162 | **params: Additional parameters |
| 2163 | |
| 2164 | Returns: |
| 2165 | Dict containing the extracted text and metadata |
| 2166 | """ |
| 2167 | start_time = time.time() |
| 2168 | task_id = self.request.id |
| 2169 | |
| 2170 | # Check if we're in a valid Celery context before updating state |
| 2171 | is_celery_context = hasattr( |
| 2172 | self, 'request') and self.request.id is not None |
| 2173 | |
| 2174 | # Update task state to PROCESSING only if in Celery context |
| 2175 | if is_celery_context: |
| 2176 | self.update_state( |
| 2177 | state=states.STARTED, |
| 2178 | meta={ |
| 2179 | 'source': source, |
| 2180 | 'source_type': source_type, |
| 2181 | 'task_name': 'process_sync', |
| 2182 | 'start_time': start_time, |
| 2183 | 'sync_mode': True |
| 2184 | } |
| 2185 | ) |
| 2186 | |
| 2187 | logger.info( |
| 2188 | f"Synchronous processing file: {source} with strategy: {chunking_strategy}") |
| 2189 | |
| 2190 | # Get the data processor instance |
| 2191 | actor = get_ray_actor() |
| 2192 | |
| 2193 | try: |
| 2194 | # Process the file based on the source type |
| 2195 | if source_type == "local": |
| 2196 | # The unified actor call, mapping 'file' source_type to 'local' destination |
| 2197 | chunks_ref = actor.process_file.remote( |
| 2198 | source, |
| 2199 | chunking_strategy, |
| 2200 | destination=source_type, |
| 2201 | task_id=task_id, |
| 2202 | **params |
| 2203 | ) |
nothing calls this directly
no test coverage detected