| 231 | |
| 232 | |
| 233 | class Tuple(DaskMethodsMixin): |
| 234 | __slots__ = ("_dask", "_keys") |
| 235 | __dask_scheduler__ = staticmethod(dask.threaded.get) |
| 236 | __dask_optimize__ = None |
| 237 | |
| 238 | def __init__(self, dsk, keys): |
| 239 | self._dask = dsk |
| 240 | self._keys = keys |
| 241 | |
| 242 | def __add__(self, other): |
| 243 | if not isinstance(other, Tuple): |
| 244 | return NotImplemented # pragma: nocover |
| 245 | return Tuple(merge(self._dask, other._dask), self._keys + other._keys) |
| 246 | |
| 247 | def __dask_graph__(self): |
| 248 | return self._dask |
| 249 | |
| 250 | def __dask_keys__(self): |
| 251 | return self._keys |
| 252 | |
| 253 | def __dask_layers__(self): |
| 254 | return tuple(get_collection_names(self)) |
| 255 | |
| 256 | def __dask_tokenize__(self): |
| 257 | return self._keys |
| 258 | |
| 259 | def __dask_postcompute__(self): |
| 260 | return tuple, () |
| 261 | |
| 262 | def __dask_postpersist__(self): |
| 263 | return Tuple._rebuild, (self._keys,) |
| 264 | |
| 265 | @staticmethod |
| 266 | def _rebuild(dsk, keys, *, rename=None): |
| 267 | if rename: |
| 268 | keys = [replace_name_in_key(key, rename) for key in keys] |
| 269 | return Tuple(dsk, keys) |
| 270 | |
| 271 | |
| 272 | def test_custom_collection(): |
no outgoing calls
searching dependent graphs…