MCPcopy
hub / github.com/pathwaycom/pathway / replay_csv

Function replay_csv

python/pathway/demo/__init__.py:212–254  ·  view source on GitHub ↗

Replay a static CSV files as a data stream. Args: path: Path to the file to stream. schema: Schema of the resulting table. autocommit_duration_ms: the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the con

(
    path: str | PathLike,
    *,
    schema: type[pw.Schema],
    input_rate: float = 1.0,
)

Source from the content-addressed store, hash-verified

210
211
212def replay_csv(
213 path: str | PathLike,
214 *,
215 schema: type[pw.Schema],
216 input_rate: float = 1.0,
217) -> pw.Table:
218 """Replay a static CSV files as a data stream.
219
220 Args:
221 path: Path to the file to stream.
222 schema: Schema of the resulting table.
223 autocommit_duration_ms: the maximum time between two commits. Every
224 autocommit_duration_ms milliseconds, the updates received by the connector are
225 committed and pushed into Pathway Live Data Framework's computation graph.
226 input_rate (float, optional): The rate at which rows are read per second. Defaults to 1.0.
227
228 Returns:
229 Table: The table read.
230
231 Note: the CSV files should follow a standard CSV settings. The separator is ',', the
232 quotechar is '"', and there is no escape.
233
234 """
235
236 autocommit_ms = int(1000.0 / input_rate)
237
238 columns = set(schema.column_names())
239
240 class FileStreamSubject(pw.io.python.ConnectorSubject):
241 def run(self):
242 with open(path, newline="") as csvfile:
243 csvreader = csv.DictReader(csvfile)
244 for row in csvreader:
245 values = {key: row[key] for key in columns}
246 self.next_json(values)
247 time.sleep(1.0 / input_rate)
248
249 return pw.io.python.read(
250 FileStreamSubject(datasource_name="demo.replay-csv"),
251 schema=schema.with_types(**{name: str for name in schema.column_names()}),
252 autocommit_duration_ms=autocommit_ms,
253 format="json",
254 ).cast_to_types(**schema.typehints())
255
256
257def replay_csv_with_time(

Callers

nothing calls this directly

Calls 5

FileStreamSubjectClass · 0.85
cast_to_typesMethod · 0.80
with_typesMethod · 0.80
column_namesMethod · 0.45
typehintsMethod · 0.45

Tested by

no test coverage detected