Create bag from many dask Delayed objects. These objects will become the partitions of the resulting Bag. They should evaluate to a ``list`` or some other concrete sequence. Parameters ---------- values: list of delayed values An iterable of dask Delayed objects. Each
(values)
| 1886 | |
| 1887 | |
| 1888 | def from_delayed(values): |
| 1889 | """Create bag from many dask Delayed objects. |
| 1890 | |
| 1891 | These objects will become the partitions of the resulting Bag. They should |
| 1892 | evaluate to a ``list`` or some other concrete sequence. |
| 1893 | |
| 1894 | Parameters |
| 1895 | ---------- |
| 1896 | values: list of delayed values |
| 1897 | An iterable of dask Delayed objects. Each evaluating to a list. |
| 1898 | |
| 1899 | Returns |
| 1900 | ------- |
| 1901 | Bag |
| 1902 | |
| 1903 | Examples |
| 1904 | -------- |
| 1905 | >>> x, y, z = [delayed(load_sequence_from_file)(fn) |
| 1906 | ... for fn in filenames] # doctest: +SKIP |
| 1907 | >>> b = from_delayed([x, y, z]) # doctest: +SKIP |
| 1908 | |
| 1909 | See also |
| 1910 | -------- |
| 1911 | dask.delayed |
| 1912 | """ |
| 1913 | from dask.delayed import Delayed, delayed |
| 1914 | |
| 1915 | if isinstance(values, Delayed): |
| 1916 | values = [values] |
| 1917 | futures = [v for v in values if isinstance(v, TaskRef)] |
| 1918 | if all_futures := (len(futures) == len(values)): |
| 1919 | # All futures. Fast path |
| 1920 | values = futures |
| 1921 | else: |
| 1922 | # Every Delayed generates a Layer, i.e. this path is much more expensive |
| 1923 | # if there are many input values. |
| 1924 | values = [ |
| 1925 | delayed(v) if not isinstance(v, (Delayed,)) and hasattr(v, "key") else v |
| 1926 | for v in values |
| 1927 | ] |
| 1928 | |
| 1929 | token = tokenize(*values) |
| 1930 | name = f"bag-from-delayed-{token}" |
| 1931 | tasks = [Task((name, i), reify, TaskRef(v.key)) for i, v in enumerate(values)] |
| 1932 | dsk = {t.key: t for t in tasks} |
| 1933 | |
| 1934 | graph = HighLevelGraph.from_collections( |
| 1935 | name, dsk, dependencies=values if not all_futures else () |
| 1936 | ) |
| 1937 | return Bag(graph, name, len(values)) |
| 1938 | |
| 1939 | |
| 1940 | def chunk_distinct(seq, key=None): |
searching dependent graphs…