MCPcopy
hub / github.com/pathwaycom/pathway / _apply

Method _apply

python/pathway/stdlib/temporal/_window.py:272–356  ·  view source on GitHub ↗
(
        self,
        table: pw.Table,
        key: pw.ColumnExpression,
        behavior: Behavior | None,
        instance: pw.ColumnExpression | None,
    )

Source from the content-addressed store, hash-verified

270
271 @check_arg_types
272 def _apply(
273 self,
274 table: pw.Table,
275 key: pw.ColumnExpression,
276 behavior: Behavior | None,
277 instance: pw.ColumnExpression | None,
278 ) -> pw.GroupedTable:
279 check_joint_types(
280 {
281 "time_expr": (key, TimeEventType),
282 "window.hop": (self.hop, IntervalType),
283 "window.duration": (self.duration, IntervalType),
284 "window.origin": (self.origin, TimeEventType),
285 }
286 )
287
288 target = _assign_sliding_windows(
289 table,
290 key=key,
291 instance=instance,
292 window=self,
293 with_original_id=True,
294 )
295
296 if behavior is not None:
297 if isinstance(behavior, ExactlyOnceBehavior):
298 duration: IntervalType
299 # that is split in two if-s, as it helps mypy figure out proper types
300 # one if impl left either self.ratio or self.duration as optionals
301 # which won't fit into the duration variable of type IntervalType
302 if self.duration is not None:
303 duration = self.duration
304 elif self.ratio is not None:
305 duration = self.ratio * self.hop
306 shift = (
307 behavior.shift
308 if behavior.shift is not None
309 else zero_length_interval(type(duration))
310 )
311 behavior = common_behavior(
312 duration + shift, shift, True # type:ignore
313 )
314 elif not isinstance(behavior, CommonBehavior):
315 raise ValueError(
316 f"behavior {behavior} unsupported in sliding/tumbling window"
317 )
318
319 if behavior.cutoff is not None:
320 cutoff_threshold = pw.this._pw_window_end + behavior.cutoff
321 target = target._freeze(cutoff_threshold, pw.this._pw_key)
322 if behavior.delay is not None:
323 target = target._buffer(
324 target._pw_window_start + behavior.delay, target._pw_key
325 )
326 target = target.with_columns(
327 _pw_key=pw.if_else(
328 target._pw_key > target._pw_window_start + behavior.delay,
329 target._pw_key,

Callers

nothing calls this directly

Calls 9

check_joint_typesFunction · 0.85
_assign_sliding_windowsFunction · 0.85
zero_length_intervalFunction · 0.85
common_behaviorFunction · 0.85
_freezeMethod · 0.80
_bufferMethod · 0.80
with_columnsMethod · 0.80
_forgetMethod · 0.80
groupbyMethod · 0.45

Tested by

no test coverage detected