MCPcopy
hub / github.com/pathwaycom/pathway / table_from_pandas

Method table_from_pandas

python/pathway/debug/__init__.py:640–696  ·  view source on GitHub ↗

A function for creating a table from a pandas DataFrame. If the DataFrame contains a column ``_time``, rows will be split into batches with timestamps from ``_time`` column. Then ``_worker`` column will be interpreted as the id of a worker which will process the row and ``_di

(
        self,
        df: pd.DataFrame,
        id_from: list[str] | None = None,
        unsafe_trusted_ids: bool = False,
        schema: type[Schema] | None = None,
        _stacklevel: int = 1,
    )

Source from the content-addressed store, hash-verified

638 )
639
640 def table_from_pandas(
641 self,
642 df: pd.DataFrame,
643 id_from: list[str] | None = None,
644 unsafe_trusted_ids: bool = False,
645 schema: type[Schema] | None = None,
646 _stacklevel: int = 1,
647 ) -> Table:
648 """A function for creating a table from a pandas DataFrame. If the DataFrame
649 contains a column ``_time``, rows will be split into batches with timestamps from ``_time`` column.
650 Then ``_worker`` column will be interpreted as the id of a worker which will process the row and
651 ``_diff`` column as an event type with ``1`` treated as inserting row and ``-1`` as removing.
652 """
653 if schema is None:
654 schema = schema_from_pandas(
655 df, exclude_columns={"_time", "_diff", "_worker"}
656 )
657 schema, api_schema = read_schema(schema)
658 value_fields: list[api.ValueField] = api_schema["value_fields"]
659
660 if "_time" not in df:
661 df["_time"] = [2] * len(df)
662 if "_worker" not in df:
663 df["_worker"] = [0] * len(df)
664 if "_diff" not in df:
665 df["_diff"] = [1] * len(df)
666
667 batches: dict[
668 int, dict[int, list[tuple[int, api.Pointer, list[api.Value]]]]
669 ] = {}
670
671 ids = api.ids_from_pandas(
672 df, api.ConnectorProperties(unsafe_trusted_ids=unsafe_trusted_ids), id_from
673 )
674
675 for row_index in range(len(df)):
676 row = df.iloc[row_index]
677 time = row["_time"]
678 key = ids[df.index[row_index]]
679 worker = row["_worker"]
680
681 if time not in batches:
682 batches[time] = {}
683
684 if worker not in batches[time]:
685 batches[time][worker] = []
686
687 values = []
688 for value_field in value_fields:
689 column = value_field.name
690 value = api.denumpify(row[column])
691 values.append(value)
692 diff = row["_diff"]
693
694 batches[time][worker].append((diff, key, values))
695
696 return self._table_from_dict(batches, schema, _stacklevel=_stacklevel + 1)
697

Callers 15

table_from_markdownMethod · 0.95
test_nullFunction · 0.80
test_tokencountFunction · 0.80
test_combine_metadataFunction · 0.80
test_utf8parserFunction · 0.80
test_parse_pypdfFunction · 0.80

Calls 3

_table_from_dictMethod · 0.95
schema_from_pandasFunction · 0.90
read_schemaFunction · 0.90

Tested by 15

test_nullFunction · 0.64
test_tokencountFunction · 0.64
test_combine_metadataFunction · 0.64
test_utf8parserFunction · 0.64
test_parse_pypdfFunction · 0.64
test_write_parquetFunction · 0.64