A function that creates a table from a mapping of timestamps to batches. Each batch is a mapping from worker id to list of rows processed in this batch by this worker, and each row is tuple (diff, key, values). Note: unless you need to specify timestamps and keys, consider u
(
self,
batches: dict[int, dict[int, list[tuple[int, api.Pointer, list[api.Value]]]]],
schema: type[Schema],
_stacklevel: int = 1,
)
| 521 | ) |
| 522 | |
| 523 | def _table_from_dict( |
| 524 | self, |
| 525 | batches: dict[int, dict[int, list[tuple[int, api.Pointer, list[api.Value]]]]], |
| 526 | schema: type[Schema], |
| 527 | _stacklevel: int = 1, |
| 528 | ) -> Table: |
| 529 | """A function that creates a table from a mapping of timestamps to batches. Each batch |
| 530 | is a mapping from worker id to list of rows processed in this batch by this worker, |
| 531 | and each row is tuple (diff, key, values). |
| 532 | |
| 533 | Note: unless you need to specify timestamps and keys, consider using |
| 534 | `table_from_list_of_batches` and `table_from_list_of_batches_by_workers`. |
| 535 | |
| 536 | Args: |
| 537 | batches: dictionary with specified batches to be put in the table |
| 538 | schema: schema of the table |
| 539 | """ |
| 540 | unique_name = self._get_next_unique_name() |
| 541 | workers = {worker for batch in batches.values() for worker in batch} |
| 542 | for worker in workers: |
| 543 | self.events[(unique_name, worker)] = [] |
| 544 | |
| 545 | timestamps = set(batches.keys()) |
| 546 | |
| 547 | if any(timestamp for timestamp in timestamps if timestamp < 0): |
| 548 | raise ValueError("negative timestamp cannot be used") |
| 549 | elif any(timestamp for timestamp in timestamps if timestamp == 0): |
| 550 | warn( |
| 551 | "rows with timestamp 0 are only backfilled and are not processed by output connectors" |
| 552 | ) |
| 553 | |
| 554 | if any(timestamp for timestamp in timestamps if timestamp % 2 == 1): |
| 555 | warn( |
| 556 | "timestamps are required to be even; all timestamps will be doubled", |
| 557 | stacklevel=_stacklevel + 1, |
| 558 | ) |
| 559 | batches = {2 * timestamp: batches[timestamp] for timestamp in batches} |
| 560 | |
| 561 | for timestamp in sorted(batches): |
| 562 | self._advance_time_for_all_workers(unique_name, workers, timestamp) |
| 563 | batch = batches[timestamp] |
| 564 | for worker, changes in batch.items(): |
| 565 | for diff, key, values in changes: |
| 566 | if diff == 1: |
| 567 | event = api.SnapshotEvent.insert(key, values) |
| 568 | self.events[(unique_name, worker)] += [event] * diff |
| 569 | elif diff == -1: |
| 570 | event = api.SnapshotEvent.delete(key, values) |
| 571 | self.events[(unique_name, worker)] += [event] * (-diff) |
| 572 | else: |
| 573 | raise ValueError("only diffs of 1 and -1 are supported") |
| 574 | |
| 575 | return read( |
| 576 | _EmptyConnectorSubject(datasource_name="debug.stream-generator"), |
| 577 | name=unique_name, |
| 578 | schema=schema, |
| 579 | ) |
| 580 |