(
self,
table: pw.Table,
key: pw.ColumnExpression,
behavior: Behavior | None,
instance: pw.ColumnExpression | None,
)
| 270 | |
| 271 | @check_arg_types |
| 272 | def _apply( |
| 273 | self, |
| 274 | table: pw.Table, |
| 275 | key: pw.ColumnExpression, |
| 276 | behavior: Behavior | None, |
| 277 | instance: pw.ColumnExpression | None, |
| 278 | ) -> pw.GroupedTable: |
| 279 | check_joint_types( |
| 280 | { |
| 281 | "time_expr": (key, TimeEventType), |
| 282 | "window.hop": (self.hop, IntervalType), |
| 283 | "window.duration": (self.duration, IntervalType), |
| 284 | "window.origin": (self.origin, TimeEventType), |
| 285 | } |
| 286 | ) |
| 287 | |
| 288 | target = _assign_sliding_windows( |
| 289 | table, |
| 290 | key=key, |
| 291 | instance=instance, |
| 292 | window=self, |
| 293 | with_original_id=True, |
| 294 | ) |
| 295 | |
| 296 | if behavior is not None: |
| 297 | if isinstance(behavior, ExactlyOnceBehavior): |
| 298 | duration: IntervalType |
| 299 | # that is split in two if-s, as it helps mypy figure out proper types |
| 300 | # one if impl left either self.ratio or self.duration as optionals |
| 301 | # which won't fit into the duration variable of type IntervalType |
| 302 | if self.duration is not None: |
| 303 | duration = self.duration |
| 304 | elif self.ratio is not None: |
| 305 | duration = self.ratio * self.hop |
| 306 | shift = ( |
| 307 | behavior.shift |
| 308 | if behavior.shift is not None |
| 309 | else zero_length_interval(type(duration)) |
| 310 | ) |
| 311 | behavior = common_behavior( |
| 312 | duration + shift, shift, True # type:ignore |
| 313 | ) |
| 314 | elif not isinstance(behavior, CommonBehavior): |
| 315 | raise ValueError( |
| 316 | f"behavior {behavior} unsupported in sliding/tumbling window" |
| 317 | ) |
| 318 | |
| 319 | if behavior.cutoff is not None: |
| 320 | cutoff_threshold = pw.this._pw_window_end + behavior.cutoff |
| 321 | target = target._freeze(cutoff_threshold, pw.this._pw_key) |
| 322 | if behavior.delay is not None: |
| 323 | target = target._buffer( |
| 324 | target._pw_window_start + behavior.delay, target._pw_key |
| 325 | ) |
| 326 | target = target.with_columns( |
| 327 | _pw_key=pw.if_else( |
| 328 | target._pw_key > target._pw_window_start + behavior.delay, |
| 329 | target._pw_key, |
nothing calls this directly
no test coverage detected