(collection, split_every)
| 71 | |
| 72 | |
| 73 | def _checkpoint_one(collection, split_every) -> Delayed: |
| 74 | tok = tokenize(collection) |
| 75 | name = f"checkpoint-{tok}" |
| 76 | |
| 77 | keys_iter = flatten(collection.__dask_keys__()) |
| 78 | try: |
| 79 | next(keys_iter) |
| 80 | next(keys_iter) |
| 81 | except StopIteration: |
| 82 | # Collection has 0 or 1 keys; no need for a map step |
| 83 | layer: Graph = {name: (chunks.checkpoint, collection.__dask_keys__())} |
| 84 | dsk = HighLevelGraph.from_collections(name, layer, dependencies=(collection,)) |
| 85 | return Delayed(name, dsk) |
| 86 | |
| 87 | # Collection has 2+ keys; apply a two-step map->reduce algorithm so that we |
| 88 | # transfer over the network and store in RAM only a handful of None's instead of |
| 89 | # the full computed collection's contents |
| 90 | dsks = [] |
| 91 | map_names = set() |
| 92 | map_keys = [] |
| 93 | |
| 94 | for prev_name in get_collection_names(collection): |
| 95 | map_name = "checkpoint_map-" + tokenize(prev_name, tok) |
| 96 | map_names.add(map_name) |
| 97 | map_layer = _build_map_layer(chunks.checkpoint, prev_name, map_name, collection) |
| 98 | map_keys += list(map_layer.get_output_keys()) |
| 99 | dsks.append( |
| 100 | HighLevelGraph.from_collections( |
| 101 | map_name, map_layer, dependencies=(collection,) |
| 102 | ) |
| 103 | ) |
| 104 | |
| 105 | # recursive aggregation |
| 106 | reduce_layer: dict = {} |
| 107 | while split_every and len(map_keys) > split_every: |
| 108 | k = (name, len(reduce_layer)) |
| 109 | reduce_layer[k] = (chunks.checkpoint, map_keys[:split_every]) |
| 110 | map_keys = map_keys[split_every:] + [k] |
| 111 | reduce_layer[name] = (chunks.checkpoint, map_keys) |
| 112 | |
| 113 | dsks.append(HighLevelGraph({name: reduce_layer}, dependencies={name: map_names})) |
| 114 | dsk = HighLevelGraph.merge(*dsks) |
| 115 | |
| 116 | return Delayed(name, dsk) |
| 117 | |
| 118 | |
| 119 | def _can_apply_blockwise(collection) -> bool: |
no test coverage detected
searching dependent graphs…