MCPcopy
hub / github.com/deepspeedai/DeepSpeed / test_async_read

Method test_async_read

tests/unit/ops/aio/test_aio.py:120–148  ·  view source on GitHub ↗
(self, tmpdir, use_cuda_pinned_tensor, single_submit, overlap_events, use_unpinned_tensor)

Source from the content-addressed store, hash-verified

118
119 @pytest.mark.parametrize("use_unpinned_tensor", [True, False])
120 def test_async_read(self, tmpdir, use_cuda_pinned_tensor, single_submit, overlap_events, use_unpinned_tensor):
121 _skip_for_invalid_environment(use_cuda_pinned_tensor=use_cuda_pinned_tensor)
122
123 use_cpu_locked_tensor = False
124 h = AsyncIOBuilder().load().aio_handle(BLOCK_SIZE, QUEUE_DEPTH, single_submit, overlap_events, IO_PARALLEL)
125
126 if use_unpinned_tensor:
127 aio_buffer = torch.empty(IO_SIZE, dtype=torch.uint8, device=get_accelerator().device_name())
128 elif use_cuda_pinned_tensor:
129 aio_buffer = get_accelerator().pin_memory(torch.empty(IO_SIZE, dtype=torch.uint8, device='cpu'))
130 else:
131 aio_buffer = h.new_cpu_locked_tensor(IO_SIZE, torch.empty(0, dtype=torch.uint8))
132 use_cpu_locked_tensor = True
133
134 _validate_handle_state(h, single_submit, overlap_events)
135
136 ref_file, _ = _do_ref_write(tmpdir)
137 read_status = h.async_pread(aio_buffer, ref_file, 0)
138 assert read_status == 0
139
140 wait_status = h.wait()
141 assert wait_status == 1
142
143 with open(ref_file, 'rb') as f:
144 ref_buffer = list(f.read())
145 assert ref_buffer == aio_buffer.tolist()
146
147 if use_cpu_locked_tensor:
148 h.free_cpu_locked_tensor(aio_buffer)
149
150
151@pytest.mark.parametrize("use_cuda_pinned_tensor", [True, False])

Callers

nothing calls this directly

Calls 10

AsyncIOBuilderClass · 0.90
get_acceleratorFunction · 0.90
_validate_handle_stateFunction · 0.70
_do_ref_writeFunction · 0.70
loadMethod · 0.45
device_nameMethod · 0.45
pin_memoryMethod · 0.45
waitMethod · 0.45
readMethod · 0.45

Tested by

no test coverage detected