MCPcopy
hub / github.com/AsyncFuncAI/deepwiki-open / prepare_data_pipeline

Function prepare_data_pipeline

api/data_pipeline.py:390–432  ·  view source on GitHub ↗

Creates and returns the data transformation pipeline. Args: embedder_type (str, optional): The embedder type ('openai', 'google', 'ollama'). If None, will be determined from configuration. is_ollama_embedder (bool, optional): DEPRECATED.

(embedder_type: str = None, is_ollama_embedder: bool = None)

Source from the content-addressed store, hash-verified

388 return documents
389
390def prepare_data_pipeline(embedder_type: str = None, is_ollama_embedder: bool = None):
391 """
392 Creates and returns the data transformation pipeline.
393
394 Args:
395 embedder_type (str, optional): The embedder type ('openai', 'google', 'ollama').
396 If None, will be determined from configuration.
397 is_ollama_embedder (bool, optional): DEPRECATED. Use embedder_type instead.
398 If None, will be determined from configuration.
399
400 Returns:
401 adal.Sequential: The data transformation pipeline
402 """
403 from api.config import get_embedder_config, get_embedder_type
404
405 # Handle backward compatibility
406 if embedder_type is None and is_ollama_embedder is not None:
407 embedder_type = 'ollama' if is_ollama_embedder else None
408
409 # Determine embedder type if not specified
410 if embedder_type is None:
411 embedder_type = get_embedder_type()
412
413 splitter = TextSplitter(**configs["text_splitter"])
414 embedder_config = get_embedder_config()
415
416 embedder = get_embedder(embedder_type=embedder_type)
417
418 # Choose appropriate processor based on embedder type
419 if embedder_type == 'ollama':
420 # Use Ollama document processor for single-document processing
421 embedder_transformer = OllamaDocumentProcessor(embedder=embedder)
422 else:
423 # Use batch processing for OpenAI and Google embedders
424 batch_size = embedder_config.get("batch_size", 500)
425 embedder_transformer = ToEmbeddings(
426 embedder=embedder, batch_size=batch_size
427 )
428
429 data_transformer = adal.Sequential(
430 splitter, embedder_transformer
431 ) # sequential will chain together splitter and embedder
432 return data_transformer
433
434def transform_documents_and_save_to_db(
435 documents: List[Document], db_path: str, embedder_type: str = None, is_ollama_embedder: bool = None

Calls 4

get_embedder_typeFunction · 0.90
get_embedder_configFunction · 0.90
get_embedderFunction · 0.90