Given an iterable of annotations dictionaries, fuse them according to some simple rules.
(*args: dict)
| 1226 | |
| 1227 | |
| 1228 | def _fuse_annotations(*args: dict) -> dict: |
| 1229 | """ |
| 1230 | Given an iterable of annotations dictionaries, fuse them according |
| 1231 | to some simple rules. |
| 1232 | """ |
| 1233 | # First, do a basic dict merge -- we are presuming that these have already |
| 1234 | # been gated by `_can_fuse_annotations`. |
| 1235 | annotations = toolz.merge(*args) |
| 1236 | # Max of layer retries |
| 1237 | retries = [a["retries"] for a in args if "retries" in a] |
| 1238 | if retries: |
| 1239 | annotations["retries"] = max(retries) |
| 1240 | # Max of layer priorities |
| 1241 | priorities = [a["priority"] for a in args if "priority" in a] |
| 1242 | if priorities: |
| 1243 | annotations["priority"] = max(priorities) |
| 1244 | # Max of all the layer resources |
| 1245 | resources = [a["resources"] for a in args if "resources" in a] |
| 1246 | if resources: |
| 1247 | annotations["resources"] = toolz.merge_with(max, *resources) |
| 1248 | # Intersection of all the worker restrictions |
| 1249 | workers = [a["workers"] for a in args if "workers" in a] |
| 1250 | if workers: |
| 1251 | annotations["workers"] = list(set.intersection(*[set(w) for w in workers])) |
| 1252 | # More restrictive of allow_other_workers |
| 1253 | allow_other_workers = [ |
| 1254 | a["allow_other_workers"] for a in args if "allow_other_workers" in a |
| 1255 | ] |
| 1256 | if allow_other_workers: |
| 1257 | annotations["allow_other_workers"] = all(allow_other_workers) |
| 1258 | |
| 1259 | return annotations |
| 1260 | |
| 1261 | |
| 1262 | def rewrite_blockwise(inputs): |
no test coverage detected
searching dependent graphs…