MCPcopy
hub / github.com/dask/dask / read_avro

Function read_avro

dask/bag/avro.py:73–139  ·  view source on GitHub ↗

Read set of avro files Use this with arbitrary nested avro schemas. Please refer to the fastavro documentation for its capabilities: https://github.com/fastavro/fastavro Parameters ---------- urlpath: string or list Absolute or relative filepath, URL (may include pr

(urlpath, blocksize=100000000, storage_options=None, compression=None)

Source from the content-addressed store, hash-verified

71
72
73def read_avro(urlpath, blocksize=100000000, storage_options=None, compression=None):
74 """Read set of avro files
75
76 Use this with arbitrary nested avro schemas. Please refer to the
77 fastavro documentation for its capabilities:
78 https://github.com/fastavro/fastavro
79
80 Parameters
81 ----------
82 urlpath: string or list
83 Absolute or relative filepath, URL (may include protocols like
84 ``s3://``), or globstring pointing to data.
85 blocksize: int or None
86 Size of chunks in bytes. If None, there will be no chunking and each
87 file will become one partition.
88 storage_options: dict or None
89 passed to backend file-system
90 compression: str or None
91 Compression format of the targe(s), like 'gzip'. Should only be used
92 with blocksize=None.
93 """
94 from dask import compute, delayed
95 from dask.bag import from_delayed
96 from dask.utils import import_required
97
98 import_required(
99 "fastavro", "fastavro is a required dependency for using bag.read_avro()."
100 )
101
102 storage_options = storage_options or {}
103 if blocksize is not None:
104 fs, fs_token, paths = get_fs_token_paths(
105 urlpath, mode="rb", storage_options=storage_options
106 )
107 dhead = delayed(open_head)
108 out = compute(*[dhead(fs, path, compression) for path in paths])
109 heads, sizes = zip(*out)
110 dread = delayed(read_chunk)
111
112 offsets = []
113 lengths = []
114 for size in sizes:
115 off = list(range(0, size, blocksize))
116 length = [blocksize] * len(off)
117 offsets.append(off)
118 lengths.append(length)
119
120 out = []
121 for path, offset, length, head in zip(paths, offsets, lengths, heads):
122 delimiter = head["sync"]
123 f = OpenFile(fs, path, compression=compression)
124 token = fs_tokenize(
125 fs_token, delimiter, path, fs.ukey(path), compression, offset
126 )
127 keys = [f"read-avro-{o}-{token}" for o in offset]
128 values = [
129 dread(f, o, l, head, dask_key_name=key)
130 for o, key, l in zip(offset, keys, length)

Callers

nothing calls this directly

Calls 4

import_requiredFunction · 0.90
delayedFunction · 0.90
computeFunction · 0.90
from_delayedFunction · 0.90

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…