MCPcopy Index your code
hub / github.com/pathwaycom/pathway / table_from_pandas

Function table_from_pandas

python/pathway/debug/__init__.py:358–419  ·  view source on GitHub ↗

A function for creating a table from a pandas DataFrame. If it contains a special column ``__time__``, rows will be split into batches with timestamps from the column. A special column ``__diff__`` can be used to set an event type - with ``1`` treated as inserting the row and ``-1`` as r

(
    df: pd.DataFrame,
    id_from: list[str] | None = None,
    unsafe_trusted_ids: bool = False,
    schema: type[Schema] | None = None,
    _stacklevel: int = 1,
    _new_universe: bool = False,
)

Source from the content-addressed store, hash-verified

356@check_arg_types
357@trace_user_frame
358def table_from_pandas(
359 df: pd.DataFrame,
360 id_from: list[str] | None = None,
361 unsafe_trusted_ids: bool = False,
362 schema: type[Schema] | None = None,
363 _stacklevel: int = 1,
364 _new_universe: bool = False,
365) -> Table:
366 """A function for creating a table from a pandas DataFrame. If it contains a special
367 column ``__time__``, rows will be split into batches with timestamps from the column.
368 A special column ``__diff__`` can be used to set an event type - with ``1`` treated
369 as inserting the row and ``-1`` as removing it.
370 """
371 if id_from is not None and schema is not None:
372 raise ValueError("parameters `schema` and `id_from` are mutually exclusive")
373
374 ordinary_columns_names = [
375 column for column in df.columns if column not in api.PANDAS_PSEUDOCOLUMNS
376 ]
377 if schema is None:
378 schema = schema_from_pandas(
379 df, id_from=id_from, exclude_columns=api.PANDAS_PSEUDOCOLUMNS
380 )
381 elif set(ordinary_columns_names) != set(schema.column_names()):
382 raise ValueError("schema does not match given dataframe")
383
384 _validate_dataframe(df, stacklevel=_stacklevel + 4)
385
386 if id_from is None and schema is not None:
387 id_from = schema.primary_key_columns()
388
389 if id_from is None:
390 ids_df = pd.DataFrame({"id": df.index})
391 ids_df.index = df.index
392 else:
393 ids_df = df[id_from].copy()
394
395 for column in api.PANDAS_PSEUDOCOLUMNS:
396 if column in df.columns:
397 ids_df[column] = df[column]
398
399 as_hashes = [fingerprint(x) for x in ids_df.to_dict(orient="records")]
400 key = fingerprint((unsafe_trusted_ids, sorted(as_hashes)))
401
402 ret: Table = table_from_datasource(
403 PandasDataSource(
404 schema=schema,
405 data=df.copy(),
406 data_source_options=DataSourceOptions(
407 unsafe_trusted_ids=unsafe_trusted_ids,
408 ),
409 )
410 )
411 from pathway.internals.parse_graph import G
412
413 if not _new_universe:
414 if key in G.static_tables_cache:
415 ret = ret.with_universe_of(G.static_tables_cache[key])

Callers 15

_pandas_transformerFunction · 0.90
load_mnist_sampleFunction · 0.90
test_unaryFunction · 0.90
test_int_div_zeroFunction · 0.90
test_int_pow_shiftFunction · 0.90
test_float_powFunction · 0.90
test_float_div_zeroFunction · 0.90
test_mixed_int_float_powFunction · 0.90
test_duration_div_zeroFunction · 0.90
test_duration_and_intFunction · 0.90

Calls 11

schema_from_pandasFunction · 0.90
fingerprintFunction · 0.90
table_from_datasourceFunction · 0.90
PandasDataSourceClass · 0.90
DataSourceOptionsClass · 0.90
_validate_dataframeFunction · 0.85
primary_key_columnsMethod · 0.80
with_universe_ofMethod · 0.80
column_namesMethod · 0.45
copyMethod · 0.45
to_dictMethod · 0.45

Tested by 15

test_unaryFunction · 0.72
test_int_div_zeroFunction · 0.72
test_int_pow_shiftFunction · 0.72
test_float_powFunction · 0.72
test_float_div_zeroFunction · 0.72
test_mixed_int_float_powFunction · 0.72
test_duration_div_zeroFunction · 0.72
test_duration_and_intFunction · 0.72