MCPcopy
hub / github.com/pathwaycom/pathway / __call__

Method __call__

python/pathway/internals/operator.py:349–426  ·  view source on GitHub ↗
(self, **kwargs)

Source from the content-addressed store, hash-verified

347 self._universe_mapping = defaultdict(Universe)
348
349 def __call__(self, **kwargs):
350 input = as_arg_tuple(kwargs)
351
352 input_copy = ArgTuple.empty()
353 iterated_with_universe_copy = ArgTuple.empty()
354
355 # unwrap input and materialize input copy
356 for name, arg in input.items():
357 if isinstance(arg, pw.Table):
358 input_copy[name] = self._copy_input_table(name, arg, unique=False)
359 elif isinstance(arg, iterate_universe):
360 iterated_with_universe_copy[name] = self._copy_input_table(
361 name, arg.table, unique=True
362 )
363 input[name] = arg.table
364 else:
365 raise TypeError(f"{name} has to be a Table instead of {type(arg)}")
366
367 assert all(isinstance(table, pw.Table) for table in input)
368
369 # call iteration logic with copied input and sort result by input order
370 raw_result = self.func_spec.func(**input_copy, **iterated_with_universe_copy)
371 arg_tuple = as_arg_tuple(raw_result)
372 result = arg_tuple.process_input(input)
373 if not iterated_with_universe_copy.is_key_subset_of(result):
374 raise ValueError(
375 "not all arguments marked as iterated returned from iteration"
376 )
377 for name, table in result.items():
378 input_table: pw.Table = input[name]
379 assert isinstance(table, pw.Table)
380 input_schema = input_table.schema._dtypes()
381 result_schema = table.schema._dtypes()
382 if input_schema != result_schema:
383 raise ValueError(
384 f"output: {result_schema} of the iterated function does not correspond to the input: {input_schema}" # noqa
385 )
386 table._sort_columns_by_other(input_table)
387
388 # designate iterated arguments
389 self.iterated_with_universe = input.intersect_keys(iterated_with_universe_copy)
390 self.iterated = input.intersect_keys(result).subtract_keys(
391 iterated_with_universe_copy
392 )
393 self.extra = input.subtract_keys(result)
394
395 # do the same for proxied arguments
396 self.iterated_with_universe_copy = iterated_with_universe_copy
397 self.iterated_copy = input_copy.intersect_keys(result).subtract_keys(
398 iterated_with_universe_copy
399 )
400 self.extra_copy = input_copy.subtract_keys(self.iterated_copy)
401
402 # prepare iteration result
403 self.result_iterated_with_universe = result.intersect_keys(
404 iterated_with_universe_copy
405 )
406 self.result_iterated = result.subtract_keys(iterated_with_universe_copy)

Callers

nothing calls this directly

Calls 15

_copy_input_tableMethod · 0.95
as_arg_tupleFunction · 0.90
UniverseClass · 0.90
itemsMethod · 0.80
is_key_subset_ofMethod · 0.80
_dtypesMethod · 0.80
intersect_keysMethod · 0.80
subtract_keysMethod · 0.80
_materializeMethod · 0.80
with_same_orderMethod · 0.80
_prepare_inputsMethod · 0.80

Tested by

no test coverage detected