| 388 | |
| 389 | |
| 390 | class ArrayWriter: |
| 391 | __slots__ = ("lock", "regions", "sources", "targets") |
| 392 | |
| 393 | def __init__(self, lock=None): |
| 394 | self.sources = [] |
| 395 | self.targets = [] |
| 396 | self.regions = [] |
| 397 | self.lock = lock |
| 398 | |
| 399 | def add(self, source, target, region=None): |
| 400 | if is_chunked_array(source): |
| 401 | self.sources.append(source) |
| 402 | self.targets.append(target) |
| 403 | self.regions.append(region) |
| 404 | elif region: |
| 405 | target[region] = source |
| 406 | else: |
| 407 | target[...] = source |
| 408 | |
| 409 | def sync(self, compute=True, chunkmanager_store_kwargs=None): |
| 410 | if self.sources: |
| 411 | chunkmanager = get_chunked_array_type(*self.sources) |
| 412 | |
| 413 | # TODO: consider wrapping targets with dask.delayed, if this makes |
| 414 | # for any discernible difference in performance, e.g., |
| 415 | # targets = [dask.delayed(t) for t in self.targets] |
| 416 | |
| 417 | if chunkmanager_store_kwargs is None: |
| 418 | chunkmanager_store_kwargs = {} |
| 419 | |
| 420 | delayed_store = chunkmanager.store( |
| 421 | self.sources, |
| 422 | self.targets, |
| 423 | lock=self.lock, |
| 424 | compute=compute, |
| 425 | flush=True, |
| 426 | regions=self.regions, |
| 427 | **chunkmanager_store_kwargs, |
| 428 | ) |
| 429 | self.sources = [] |
| 430 | self.targets = [] |
| 431 | self.regions = [] |
| 432 | return delayed_store |
| 433 | |
| 434 | |
| 435 | class AbstractWritableDataStore(AbstractDataStore): |
no outgoing calls
no test coverage detected
searching dependent graphs…