MCPcopy Index your code
hub / github.com/pathwaycom/pathway / __init__

Method __init__

python/pathway/internals/schema.py:288–315  ·  view source on GitHub ↗
(
        self,
        name: str,
        bases: tuple[type],
        namespace: dict[str, Any],
        append_only: bool | None = None,
        id_dtype: dt.DType = dt.ANY_POINTER,
        id_append_only: bool | None = None,
        **kwargs,
    )

Source from the content-addressed store, hash-verified

286
287 @trace.trace_user_frame
288 def __init__(
289 self,
290 name: str,
291 bases: tuple[type],
292 namespace: dict[str, Any],
293 append_only: bool | None = None,
294 id_dtype: dt.DType = dt.ANY_POINTER,
295 id_append_only: bool | None = None,
296 **kwargs,
297 ) -> None:
298 super().__init__(name, bases, namespace, **kwargs)
299 schema_properties = SchemaProperties(append_only=append_only)
300 self.__columns__ = _create_column_definitions(self, bases, schema_properties)
301 pk_dtypes = [col.dtype for col in self.__columns__.values() if col.primary_key]
302 if len(pk_dtypes) > 0:
303 derived_type = dt.Pointer(*pk_dtypes)
304 assert id_dtype in [derived_type, dt.ANY_POINTER]
305 id_dtype = derived_type
306 self.__universe_properties__ = _universe_properties(
307 list(self.__columns__.values()),
308 schema_properties,
309 dtype=id_dtype,
310 append_only=id_append_only,
311 )
312 self.__dtypes__ = {
313 name: column.dtype for name, column in self.__columns__.items()
314 }
315 self.__types__ = {k: v.typehint for k, v in self.__dtypes__.items()}
316
317 def __call__(self) -> NoReturn:
318 raise TypeError(

Callers

nothing calls this directly

Calls 5

SchemaPropertiesClass · 0.85
_universe_propertiesFunction · 0.85
valuesMethod · 0.80
itemsMethod · 0.80

Tested by

no test coverage detected