MCPcopy
hub / github.com/pathwaycom/pathway / _json_table_from_list

Function _json_table_from_list

python/pathway/tests/test_json.py:27–46  ·  view source on GitHub ↗
(data)

Source from the content-addressed store, hash-verified

25
26
27def _json_table_from_list(data):
28 class _JsonSubject(pw.io.python.ConnectorSubject):
29 def __init__(self, data: list[dict[str, Any]]) -> None:
30 super().__init__()
31 self.data = data
32
33 def run(self) -> None:
34 for key, row in enumerate(self.data):
35 self.next(
36 key=key + 1, **{name: pw.Json(value) for name, value in row.items()}
37 )
38
39 schema = pw.schema_builder(
40 columns={
41 "key": pw.column_definition(dtype=int, primary_key=True),
42 **{name: pw.column_definition(dtype=pw.Json) for name in data[0]},
43 }
44 )
45
46 return pw.io.python.read(_JsonSubject(data), schema=schema).without(pw.this.key)
47
48
49def _json_table(**kwargs) -> pw.Table:

Callers 2

_json_tableFunction · 0.85
test_json_inputFunction · 0.85

Calls 2

_JsonSubjectClass · 0.85
withoutMethod · 0.45

Tested by

no test coverage detected