MCPcopy Index your code
hub / github.com/pathwaycom/pathway / DataSource

Class DataSource

python/pathway/internals/datasource.py:32–69  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

30
31@dataclass(frozen=True, kw_only=True)
32class DataSource(ABC):
33 schema: type[Schema]
34 data_source_options: DataSourceOptions = DataSourceOptions()
35
36 @property
37 def connector_properties(self) -> api.ConnectorProperties:
38 columns: list[api.ColumnProperties] = []
39 for column in self.schema.columns().values():
40 columns.append(
41 api.ColumnProperties(
42 dtype=column.dtype.to_engine(),
43 append_only=self.is_append_only(),
44 )
45 )
46
47 return api.ConnectorProperties(
48 commit_duration_ms=self.data_source_options.commit_duration_ms,
49 unsafe_trusted_ids=self.data_source_options.unsafe_trusted_ids,
50 column_properties=columns,
51 unique_name=self.data_source_options.unique_name,
52 synchronization_group=self.data_source_options.synchronization_group,
53 max_backlog_size=self.data_source_options.max_backlog_size,
54 )
55
56 def get_effective_schema(self) -> type[Schema]:
57 if self.is_append_only():
58 return self.schema.update_properties(append_only=True)
59 return self.schema
60
61 @abstractmethod
62 def is_bounded(self) -> bool: ...
63
64 @abstractmethod
65 def is_append_only(self) -> bool: ...
66
67 @property
68 def name(self) -> str:
69 return type(self).__qualname__.lower().removesuffix("datasource")
70
71
72class StaticDataSource(DataSource, ABC):

Callers

nothing calls this directly

Calls 1

DataSourceOptionsClass · 0.85

Tested by

no test coverage detected