Read set of avro files Use this with arbitrary nested avro schemas. Please refer to the fastavro documentation for its capabilities: https://github.com/fastavro/fastavro Parameters ---------- urlpath: string or list Absolute or relative filepath, URL (may include pr
(urlpath, blocksize=100000000, storage_options=None, compression=None)
| 71 | |
| 72 | |
| 73 | def read_avro(urlpath, blocksize=100000000, storage_options=None, compression=None): |
| 74 | """Read set of avro files |
| 75 | |
| 76 | Use this with arbitrary nested avro schemas. Please refer to the |
| 77 | fastavro documentation for its capabilities: |
| 78 | https://github.com/fastavro/fastavro |
| 79 | |
| 80 | Parameters |
| 81 | ---------- |
| 82 | urlpath: string or list |
| 83 | Absolute or relative filepath, URL (may include protocols like |
| 84 | ``s3://``), or globstring pointing to data. |
| 85 | blocksize: int or None |
| 86 | Size of chunks in bytes. If None, there will be no chunking and each |
| 87 | file will become one partition. |
| 88 | storage_options: dict or None |
| 89 | passed to backend file-system |
| 90 | compression: str or None |
| 91 | Compression format of the targe(s), like 'gzip'. Should only be used |
| 92 | with blocksize=None. |
| 93 | """ |
| 94 | from dask import compute, delayed |
| 95 | from dask.bag import from_delayed |
| 96 | from dask.utils import import_required |
| 97 | |
| 98 | import_required( |
| 99 | "fastavro", "fastavro is a required dependency for using bag.read_avro()." |
| 100 | ) |
| 101 | |
| 102 | storage_options = storage_options or {} |
| 103 | if blocksize is not None: |
| 104 | fs, fs_token, paths = get_fs_token_paths( |
| 105 | urlpath, mode="rb", storage_options=storage_options |
| 106 | ) |
| 107 | dhead = delayed(open_head) |
| 108 | out = compute(*[dhead(fs, path, compression) for path in paths]) |
| 109 | heads, sizes = zip(*out) |
| 110 | dread = delayed(read_chunk) |
| 111 | |
| 112 | offsets = [] |
| 113 | lengths = [] |
| 114 | for size in sizes: |
| 115 | off = list(range(0, size, blocksize)) |
| 116 | length = [blocksize] * len(off) |
| 117 | offsets.append(off) |
| 118 | lengths.append(length) |
| 119 | |
| 120 | out = [] |
| 121 | for path, offset, length, head in zip(paths, offsets, lengths, heads): |
| 122 | delimiter = head["sync"] |
| 123 | f = OpenFile(fs, path, compression=compression) |
| 124 | token = fs_tokenize( |
| 125 | fs_token, delimiter, path, fs.ukey(path), compression, offset |
| 126 | ) |
| 127 | keys = [f"read-avro-{o}-{token}" for o in offset] |
| 128 | values = [ |
| 129 | dread(f, o, l, head, dask_key_name=key) |
| 130 | for o, key, l in zip(offset, keys, length) |
nothing calls this directly
no test coverage detected
searching dependent graphs…