MCPcopy
hub / github.com/StructuredLabs/preswald / connect

Method connect

preswald/engine/managers/data.py:395–500  ·  view source on GitHub ↗

Initialize all data sources from config

(self)

Source from the content-addressed store, hash-verified

393 self.duckdb_conn = duckdb.connect(":memory:")
394
395 def connect(self): # noqa: C901
396 """Initialize all data sources from config"""
397 # Useful debugging query - Log final DuckDB state
398 # tables_df = self.duckdb_conn.execute("""
399 # SELECT
400 # table_name,
401 # column_count,
402 # estimated_size as size_b
403 # FROM duckdb_tables()
404 # """).df()
405
406 # logger.info(f"Current DuckDB state - {len(tables_df)} tables:")
407 # for _, row in tables_df.iterrows():
408 # logger.info(
409 # f"Table: {row['table_name']}, Columns: {row['column_count']}, Estimated Size: {row['size_b']:.2f}B"
410 # )
411
412 config = self._load_sources()
413
414 # Only process sources that are new or have changed
415 for name, source_config in config.items():
416 if "type" not in source_config:
417 continue
418
419 if not self._has_source_changed(name, source_config):
420 logger.debug(f"Skipping unchanged source: {name}")
421 continue
422
423 if name in self.sources:
424 self._drop_source_table(self.sources[name])
425
426 source_type = source_config["type"]
427 logger.info(f"Initializing/updating source: {name} ({source_type})")
428
429 try:
430 if source_type == "csv":
431 cfg = CSVConfig(path=source_config["path"])
432 self.sources[name] = CSVSource(name, cfg, self.duckdb_conn)
433
434 elif source_type == "json":
435 cfg = JSONConfig(
436 path=source_config["path"],
437 record_path=source_config.get("record_path"),
438 flatten=source_config.get("flatten", True),
439 )
440 self.sources[name] = JSONSource(name, cfg, self.duckdb_conn)
441
442 elif source_type == "postgres":
443 cfg = PostgresConfig(
444 host=source_config["host"],
445 port=source_config["port"],
446 dbname=source_config["dbname"],
447 user=source_config["user"],
448 password=source_config["password"],
449 )
450 self.sources[name] = PostgresSource(name, cfg, self.duckdb_conn)
451
452 elif source_type == "clickhouse":

Callers 3

connect_data_managerMethod · 0.45
__init__Method · 0.45
connectFunction · 0.45

Calls 15

_load_sourcesMethod · 0.95
_has_source_changedMethod · 0.95
_drop_source_tableMethod · 0.95
CSVConfigClass · 0.85
CSVSourceClass · 0.85
JSONConfigClass · 0.85
JSONSourceClass · 0.85
PostgresConfigClass · 0.85
PostgresSourceClass · 0.85
ClickhouseConfigClass · 0.85
ClickhouseSourceClass · 0.85
APIConfigClass · 0.85

Tested by

no test coverage detected