| 3524 | self.n_processed_rows = 0 |
| 3525 | |
| 3526 | def __call__(self, key, row, time, is_addition): |
| 3527 | self.n_processed_rows += 1 |
| 3528 | column_values = known_rows[row["pkey"]] |
| 3529 | for field, expected_value in column_values.items(): |
| 3530 | if isinstance(expected_value, np.ndarray): |
| 3531 | assert row[field].shape == expected_value.shape |
| 3532 | assert (row[field] == expected_value).all() |
| 3533 | else: |
| 3534 | expected_values = [expected_value] |
| 3535 | if data_format == "csv" and expected_value is None: |
| 3536 | # Impossible to parse unambiguosly, hence allowing string "None" |
| 3537 | # or base64-decoded option |
| 3538 | if field == "string": |
| 3539 | expected_values.append("None") |
| 3540 | elif field == "binary_data": |
| 3541 | expected_values.append(base64.b64decode("None")) |
| 3542 | |
| 3543 | assert row[field] in expected_values |
| 3544 | |
| 3545 | InputSchema = table.schema |
| 3546 | if data_format == "delta": |