MCPcopy Index your code
hub / github.com/pathwaycom/pathway / _table_from_dict

Method _table_from_dict

python/pathway/debug/__init__.py:523–579  ·  view source on GitHub ↗

A function that creates a table from a mapping of timestamps to batches. Each batch is a mapping from worker id to list of rows processed in this batch by this worker, and each row is tuple (diff, key, values). Note: unless you need to specify timestamps and keys, consider u

(
        self,
        batches: dict[int, dict[int, list[tuple[int, api.Pointer, list[api.Value]]]]],
        schema: type[Schema],
        _stacklevel: int = 1,
    )

Source from the content-addressed store, hash-verified

521 )
522
523 def _table_from_dict(
524 self,
525 batches: dict[int, dict[int, list[tuple[int, api.Pointer, list[api.Value]]]]],
526 schema: type[Schema],
527 _stacklevel: int = 1,
528 ) -> Table:
529 """A function that creates a table from a mapping of timestamps to batches. Each batch
530 is a mapping from worker id to list of rows processed in this batch by this worker,
531 and each row is tuple (diff, key, values).
532
533 Note: unless you need to specify timestamps and keys, consider using
534 `table_from_list_of_batches` and `table_from_list_of_batches_by_workers`.
535
536 Args:
537 batches: dictionary with specified batches to be put in the table
538 schema: schema of the table
539 """
540 unique_name = self._get_next_unique_name()
541 workers = {worker for batch in batches.values() for worker in batch}
542 for worker in workers:
543 self.events[(unique_name, worker)] = []
544
545 timestamps = set(batches.keys())
546
547 if any(timestamp for timestamp in timestamps if timestamp < 0):
548 raise ValueError("negative timestamp cannot be used")
549 elif any(timestamp for timestamp in timestamps if timestamp == 0):
550 warn(
551 "rows with timestamp 0 are only backfilled and are not processed by output connectors"
552 )
553
554 if any(timestamp for timestamp in timestamps if timestamp % 2 == 1):
555 warn(
556 "timestamps are required to be even; all timestamps will be doubled",
557 stacklevel=_stacklevel + 1,
558 )
559 batches = {2 * timestamp: batches[timestamp] for timestamp in batches}
560
561 for timestamp in sorted(batches):
562 self._advance_time_for_all_workers(unique_name, workers, timestamp)
563 batch = batches[timestamp]
564 for worker, changes in batch.items():
565 for diff, key, values in changes:
566 if diff == 1:
567 event = api.SnapshotEvent.insert(key, values)
568 self.events[(unique_name, worker)] += [event] * diff
569 elif diff == -1:
570 event = api.SnapshotEvent.delete(key, values)
571 self.events[(unique_name, worker)] += [event] * (-diff)
572 else:
573 raise ValueError("only diffs of 1 and -1 are supported")
574
575 return read(
576 _EmptyConnectorSubject(datasource_name="debug.stream-generator"),
577 name=unique_name,
578 schema=schema,
579 )
580

Calls 8

_get_next_unique_nameMethod · 0.95
readFunction · 0.90
anyFunction · 0.85
valuesMethod · 0.80
itemsMethod · 0.80
keysMethod · 0.45