MCPcopy
hub / github.com/dask/dask / unpack_collections

Function unpack_collections

dask/delayed.py:115–293  ·  view source on GitHub ↗

Normalize a python object and merge all sub-graphs. - Replace ``Delayed`` with their keys - Convert literals to things the schedulers can handle - Extract dask graphs from all enclosed values. Note, that the returned _task_ is not necessarily runnable and the caller is responsi

(expr, _return_collections=True)

Source from the content-addressed store, hash-verified

113
114
115def unpack_collections(expr, _return_collections=True):
116 """Normalize a python object and merge all sub-graphs.
117
118 - Replace ``Delayed`` with their keys
119 - Convert literals to things the schedulers can handle
120 - Extract dask graphs from all enclosed values.
121
122 Note, that the returned _task_ is not necessarily runnable and the caller is
123 responsible to deal with the output types accordingly.
124
125 The task is one of
126 - `TaskRef` as a pointer to the collection returned in collections. This is
127 not callable and should not be a top-level member of a dask task graph.
128 - A runnable task (i.e. subclass `GraphNode`) which can be embedded
129 directly into a task graph. This indicates that a dask collection was
130 encountered on a deeper nesting level and this runnable task restores the
131 input nesting with the computed dask collection replaced.
132 - The unaltered object as provided if no dask collections are found.
133
134 Parameters
135 ----------
136 expr : object
137 The object to be normalized. This function knows how to handle
138 dask collections, as well as most builtin python types.
139
140 _optimize_collections: bool, optional
141 Internal use only!
142
143
144 Returns
145 -------
146 task : object
147 collections : a tuple of collections
148
149 Examples
150 --------
151 >>> import dask
152 >>> a = delayed(1, 'a')
153 >>> b = delayed(2, 'b')
154 >>> task, collections = unpack_collections([a, b, 3])
155 >>> task
156 List((TaskRef('a'), TaskRef('b'), 3))
157 >>> collections
158 (Delayed('a'), Delayed('b'))
159
160 >>> task, collections = unpack_collections({a: 1, b: 2})
161 >>> task
162 Dict(a: 1, b: 2)
163 >>> collections
164 (Delayed('a'), Delayed('b'))
165 """
166 if isinstance(expr, Delayed):
167 if _return_collections:
168 return TaskRef(expr._key), (expr,)
169 else:
170 expr = collections_to_expr(expr).finalize_compute()
171 (name,) = expr.__dask_keys__()
172 return TaskRef(name), (expr,)

Callers 7

map_partitionsFunction · 0.90
histogramFunction · 0.90
blockwiseFunction · 0.90
_layerMethod · 0.90
delayedFunction · 0.70
call_functionFunction · 0.70

Calls 15

TaskRefClass · 0.90
collections_to_exprFunction · 0.90
ProhibitReuseClass · 0.90
flattenFunction · 0.90
ListClass · 0.90
TaskClass · 0.90
DictClass · 0.90
DelayedClass · 0.85
setClass · 0.85
unzipMethod · 0.80
finalize_computeMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…