Replay a static CSV files as a data stream. Args: path: Path to the file to stream. schema: Schema of the resulting table. autocommit_duration_ms: the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the con
(
path: str | PathLike,
*,
schema: type[pw.Schema],
input_rate: float = 1.0,
)
| 210 | |
| 211 | |
| 212 | def replay_csv( |
| 213 | path: str | PathLike, |
| 214 | *, |
| 215 | schema: type[pw.Schema], |
| 216 | input_rate: float = 1.0, |
| 217 | ) -> pw.Table: |
| 218 | """Replay a static CSV files as a data stream. |
| 219 | |
| 220 | Args: |
| 221 | path: Path to the file to stream. |
| 222 | schema: Schema of the resulting table. |
| 223 | autocommit_duration_ms: the maximum time between two commits. Every |
| 224 | autocommit_duration_ms milliseconds, the updates received by the connector are |
| 225 | committed and pushed into Pathway Live Data Framework's computation graph. |
| 226 | input_rate (float, optional): The rate at which rows are read per second. Defaults to 1.0. |
| 227 | |
| 228 | Returns: |
| 229 | Table: The table read. |
| 230 | |
| 231 | Note: the CSV files should follow a standard CSV settings. The separator is ',', the |
| 232 | quotechar is '"', and there is no escape. |
| 233 | |
| 234 | """ |
| 235 | |
| 236 | autocommit_ms = int(1000.0 / input_rate) |
| 237 | |
| 238 | columns = set(schema.column_names()) |
| 239 | |
| 240 | class FileStreamSubject(pw.io.python.ConnectorSubject): |
| 241 | def run(self): |
| 242 | with open(path, newline="") as csvfile: |
| 243 | csvreader = csv.DictReader(csvfile) |
| 244 | for row in csvreader: |
| 245 | values = {key: row[key] for key in columns} |
| 246 | self.next_json(values) |
| 247 | time.sleep(1.0 / input_rate) |
| 248 | |
| 249 | return pw.io.python.read( |
| 250 | FileStreamSubject(datasource_name="demo.replay-csv"), |
| 251 | schema=schema.with_types(**{name: str for name in schema.column_names()}), |
| 252 | autocommit_duration_ms=autocommit_ms, |
| 253 | format="json", |
| 254 | ).cast_to_types(**schema.typehints()) |
| 255 | |
| 256 | |
| 257 | def replay_csv_with_time( |
nothing calls this directly
no test coverage detected