A function for creating a table from a pandas DataFrame. If the DataFrame contains a column ``_time``, rows will be split into batches with timestamps from ``_time`` column. Then ``_worker`` column will be interpreted as the id of a worker which will process the row and ``_di
(
self,
df: pd.DataFrame,
id_from: list[str] | None = None,
unsafe_trusted_ids: bool = False,
schema: type[Schema] | None = None,
_stacklevel: int = 1,
)
| 638 | ) |
| 639 | |
| 640 | def table_from_pandas( |
| 641 | self, |
| 642 | df: pd.DataFrame, |
| 643 | id_from: list[str] | None = None, |
| 644 | unsafe_trusted_ids: bool = False, |
| 645 | schema: type[Schema] | None = None, |
| 646 | _stacklevel: int = 1, |
| 647 | ) -> Table: |
| 648 | """A function for creating a table from a pandas DataFrame. If the DataFrame |
| 649 | contains a column ``_time``, rows will be split into batches with timestamps from ``_time`` column. |
| 650 | Then ``_worker`` column will be interpreted as the id of a worker which will process the row and |
| 651 | ``_diff`` column as an event type with ``1`` treated as inserting row and ``-1`` as removing. |
| 652 | """ |
| 653 | if schema is None: |
| 654 | schema = schema_from_pandas( |
| 655 | df, exclude_columns={"_time", "_diff", "_worker"} |
| 656 | ) |
| 657 | schema, api_schema = read_schema(schema) |
| 658 | value_fields: list[api.ValueField] = api_schema["value_fields"] |
| 659 | |
| 660 | if "_time" not in df: |
| 661 | df["_time"] = [2] * len(df) |
| 662 | if "_worker" not in df: |
| 663 | df["_worker"] = [0] * len(df) |
| 664 | if "_diff" not in df: |
| 665 | df["_diff"] = [1] * len(df) |
| 666 | |
| 667 | batches: dict[ |
| 668 | int, dict[int, list[tuple[int, api.Pointer, list[api.Value]]]] |
| 669 | ] = {} |
| 670 | |
| 671 | ids = api.ids_from_pandas( |
| 672 | df, api.ConnectorProperties(unsafe_trusted_ids=unsafe_trusted_ids), id_from |
| 673 | ) |
| 674 | |
| 675 | for row_index in range(len(df)): |
| 676 | row = df.iloc[row_index] |
| 677 | time = row["_time"] |
| 678 | key = ids[df.index[row_index]] |
| 679 | worker = row["_worker"] |
| 680 | |
| 681 | if time not in batches: |
| 682 | batches[time] = {} |
| 683 | |
| 684 | if worker not in batches[time]: |
| 685 | batches[time][worker] = [] |
| 686 | |
| 687 | values = [] |
| 688 | for value_field in value_fields: |
| 689 | column = value_field.name |
| 690 | value = api.denumpify(row[column]) |
| 691 | values.append(value) |
| 692 | diff = row["_diff"] |
| 693 | |
| 694 | batches[time][worker].append((diff, key, values)) |
| 695 | |
| 696 | return self._table_from_dict(batches, schema, _stacklevel=_stacklevel + 1) |
| 697 |