Tokenize user input into GraphNode objects Note: This is similar to `convert_legacy_task` but does not - compare any values to a global set of known keys to infer references/futures - parse tuples and interprets them as runnable tasks - Deal with SubgraphCallables Parameters
(obj: Any)
| 115 | |
| 116 | |
| 117 | def parse_input(obj: Any) -> object: |
| 118 | """Tokenize user input into GraphNode objects |
| 119 | |
| 120 | Note: This is similar to `convert_legacy_task` but does not |
| 121 | - compare any values to a global set of known keys to infer references/futures |
| 122 | - parse tuples and interprets them as runnable tasks |
| 123 | - Deal with SubgraphCallables |
| 124 | |
| 125 | Parameters |
| 126 | ---------- |
| 127 | obj : object |
| 128 | The input object to tokenize |
| 129 | |
| 130 | Returns |
| 131 | ------- |
| 132 | GraphNode or the original object |
| 133 | The tokenized object or the original input if no tokenization is needed |
| 134 | """ |
| 135 | if isinstance(obj, GraphNode): |
| 136 | return obj |
| 137 | |
| 138 | if _is_dask_future(obj): |
| 139 | return Alias(obj.key) |
| 140 | |
| 141 | if isinstance(obj, dict): |
| 142 | parsed_dict = {k: parse_input(v) for k, v in obj.items()} |
| 143 | if any(isinstance(v, GraphNode) for v in parsed_dict.values()): |
| 144 | return Dict(parsed_dict) |
| 145 | |
| 146 | if isinstance(obj, (list, set, tuple)): |
| 147 | parsed_collection = tuple(parse_input(o) for o in obj) |
| 148 | if any(isinstance(o, GraphNode) for o in parsed_collection): |
| 149 | if isinstance(obj, list): |
| 150 | return List(*parsed_collection) |
| 151 | if isinstance(obj, set): |
| 152 | return Set(*parsed_collection) |
| 153 | if isinstance(obj, tuple): |
| 154 | if is_namedtuple_instance(obj): |
| 155 | return _wrap_namedtuple_task(None, obj, parse_input) |
| 156 | return Tuple(*parsed_collection) |
| 157 | |
| 158 | return obj |
| 159 | |
| 160 | |
| 161 | def _wrap_namedtuple_task(k, obj, parser): |
searching dependent graphs…