MCPcopy Index your code
hub / github.com/dask/dask / _layer

Method _layer

dask/dataframe/dask_expr/_expr.py:1044–1132  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

1042 return self.frame.divisions
1043
1044 def _layer(self) -> dict:
1045 dsk, prevs, nexts = {}, [], [] # type: ignore[var-annotated]
1046
1047 name_prepend = f"overlap-prepend-{self._name}"
1048 if self.before:
1049 prevs.append(None)
1050 if isinstance(self.before, numbers.Integral):
1051 before = self.before
1052 for i in range(self.frame.npartitions - 1):
1053 dsk[(name_prepend, i)] = (M.tail, (self.frame._name, i), before)
1054 prevs.append((name_prepend, i))
1055 elif isinstance(self.before, datetime.timedelta):
1056 # Assumes monotonic (increasing?) index
1057 divs = pd.Series(self.frame.divisions)
1058 deltas = divs.diff().iloc[1:-1]
1059
1060 # In the first case window-size is larger than at least one partition, thus it is
1061 # necessary to calculate how many partitions must be used for each rolling task.
1062 # Otherwise, these calculations can be skipped (faster)
1063
1064 if (self.before > deltas).any():
1065 pt_z = divs[0]
1066 for i in range(self.frame.npartitions - 1):
1067 # Select all indexes of relevant partitions between the current partition and
1068 # the partition with the highest division outside the rolling window (before)
1069 pt_i = divs[i + 1]
1070
1071 # lower-bound the search to the first division
1072 lb = max(pt_i - self.before, pt_z)
1073
1074 first, j = divs[i], i
1075 while first > lb and j > 0:
1076 first = first - deltas[j]
1077 j = j - 1
1078
1079 dsk[(name_prepend, i)] = ( # type: ignore[assignment]
1080 _tail_timedelta,
1081 (self.frame._name, i + 1),
1082 [(self.frame._name, k) for k in range(j, i + 1)],
1083 self.before,
1084 )
1085 prevs.append((name_prepend, i))
1086 else:
1087 for i in range(self.frame.npartitions - 1):
1088 dsk[(name_prepend, i)] = ( # type: ignore[assignment]
1089 _tail_timedelta,
1090 (self.frame._name, i + 1),
1091 [(self.frame._name, i)],
1092 self.before,
1093 )
1094 prevs.append((name_prepend, i))
1095 else:
1096 prevs.extend([None] * self.frame.npartitions) # type: ignore[list-item]
1097
1098 name_append = f"overlap-append-{self._name}"
1099 if self.after:
1100 if isinstance(self.after, numbers.Integral):
1101 after = self.after

Callers

nothing calls this directly

Calls 3

maxFunction · 0.85
diffMethod · 0.80
anyMethod · 0.45

Tested by

no test coverage detected