(
subject: ConnectorSubject,
*,
schema: type[Schema],
autocommit_duration_ms: int | None = 1500,
name: str | None = None,
max_backlog_size: int | None = None,
_stacklevel: int = 1,
**kwargs,
)
| 329 | |
| 330 | |
| 331 | def _create_python_datasource( |
| 332 | subject: ConnectorSubject, |
| 333 | *, |
| 334 | schema: type[Schema], |
| 335 | autocommit_duration_ms: int | None = 1500, |
| 336 | name: str | None = None, |
| 337 | max_backlog_size: int | None = None, |
| 338 | _stacklevel: int = 1, |
| 339 | **kwargs, |
| 340 | ) -> datasource.GenericDataSource: |
| 341 | schema, api_schema = read_schema(schema) |
| 342 | data_format = api.DataFormat( |
| 343 | **api_schema, |
| 344 | format_type="transparent", |
| 345 | session_type=subject._session_type, |
| 346 | ) |
| 347 | if max_backlog_size is not None: |
| 348 | subject._set_max_backlog_size(max_backlog_size) |
| 349 | data_storage = api.DataStorage( |
| 350 | storage_type="python", |
| 351 | python_subject=api.PythonSubject( |
| 352 | start=subject.start, |
| 353 | seek=subject.seek, |
| 354 | on_persisted_run=subject.on_persisted_run, |
| 355 | read=subject._read, |
| 356 | end=subject.end, |
| 357 | is_internal=subject._is_internal(), |
| 358 | deletions_enabled=subject._deletions_enabled, |
| 359 | ), |
| 360 | ) |
| 361 | data_source_options = datasource.DataSourceOptions( |
| 362 | commit_duration_ms=autocommit_duration_ms, |
| 363 | max_backlog_size=max_backlog_size, |
| 364 | unique_name=_get_unique_name(name, kwargs, stacklevel=_stacklevel + 1), |
| 365 | ) |
| 366 | return datasource.GenericDataSource( |
| 367 | datastorage=data_storage, |
| 368 | dataformat=data_format, |
| 369 | data_source_options=data_source_options, |
| 370 | schema=schema, |
| 371 | datasource_name=subject._datasource_name, |
| 372 | append_only=not subject._deletions_enabled, |
| 373 | ) |
| 374 | |
| 375 | |
| 376 | def _are_deletions_reachable(subject) -> bool: |
no test coverage detected