A function for creating a table from a pandas DataFrame. If it contains a special column ``__time__``, rows will be split into batches with timestamps from the column. A special column ``__diff__`` can be used to set an event type - with ``1`` treated as inserting the row and ``-1`` as r
(
df: pd.DataFrame,
id_from: list[str] | None = None,
unsafe_trusted_ids: bool = False,
schema: type[Schema] | None = None,
_stacklevel: int = 1,
_new_universe: bool = False,
)
| 356 | @check_arg_types |
| 357 | @trace_user_frame |
| 358 | def table_from_pandas( |
| 359 | df: pd.DataFrame, |
| 360 | id_from: list[str] | None = None, |
| 361 | unsafe_trusted_ids: bool = False, |
| 362 | schema: type[Schema] | None = None, |
| 363 | _stacklevel: int = 1, |
| 364 | _new_universe: bool = False, |
| 365 | ) -> Table: |
| 366 | """A function for creating a table from a pandas DataFrame. If it contains a special |
| 367 | column ``__time__``, rows will be split into batches with timestamps from the column. |
| 368 | A special column ``__diff__`` can be used to set an event type - with ``1`` treated |
| 369 | as inserting the row and ``-1`` as removing it. |
| 370 | """ |
| 371 | if id_from is not None and schema is not None: |
| 372 | raise ValueError("parameters `schema` and `id_from` are mutually exclusive") |
| 373 | |
| 374 | ordinary_columns_names = [ |
| 375 | column for column in df.columns if column not in api.PANDAS_PSEUDOCOLUMNS |
| 376 | ] |
| 377 | if schema is None: |
| 378 | schema = schema_from_pandas( |
| 379 | df, id_from=id_from, exclude_columns=api.PANDAS_PSEUDOCOLUMNS |
| 380 | ) |
| 381 | elif set(ordinary_columns_names) != set(schema.column_names()): |
| 382 | raise ValueError("schema does not match given dataframe") |
| 383 | |
| 384 | _validate_dataframe(df, stacklevel=_stacklevel + 4) |
| 385 | |
| 386 | if id_from is None and schema is not None: |
| 387 | id_from = schema.primary_key_columns() |
| 388 | |
| 389 | if id_from is None: |
| 390 | ids_df = pd.DataFrame({"id": df.index}) |
| 391 | ids_df.index = df.index |
| 392 | else: |
| 393 | ids_df = df[id_from].copy() |
| 394 | |
| 395 | for column in api.PANDAS_PSEUDOCOLUMNS: |
| 396 | if column in df.columns: |
| 397 | ids_df[column] = df[column] |
| 398 | |
| 399 | as_hashes = [fingerprint(x) for x in ids_df.to_dict(orient="records")] |
| 400 | key = fingerprint((unsafe_trusted_ids, sorted(as_hashes))) |
| 401 | |
| 402 | ret: Table = table_from_datasource( |
| 403 | PandasDataSource( |
| 404 | schema=schema, |
| 405 | data=df.copy(), |
| 406 | data_source_options=DataSourceOptions( |
| 407 | unsafe_trusted_ids=unsafe_trusted_ids, |
| 408 | ), |
| 409 | ) |
| 410 | ) |
| 411 | from pathway.internals.parse_graph import G |
| 412 | |
| 413 | if not _new_universe: |
| 414 | if key in G.static_tables_cache: |
| 415 | ret = ret.with_universe_of(G.static_tables_cache[key]) |