(self, tmpdir, async_queue)
| 209 | |
| 210 | @pytest.mark.parametrize("async_queue", [2, 3]) |
| 211 | def test_write(self, tmpdir, async_queue): |
| 212 | ref_files = [] |
| 213 | ref_buffers = [] |
| 214 | for i in range(async_queue): |
| 215 | f, buf = _do_ref_write(tmpdir, i) |
| 216 | ref_files.append(f) |
| 217 | ref_buffers.append(buf) |
| 218 | |
| 219 | single_submit = True |
| 220 | overlap_events = True |
| 221 | h = GDSBuilder().load().gds_handle(BLOCK_SIZE, QUEUE_DEPTH, single_submit, overlap_events, IO_PARALLEL) |
| 222 | |
| 223 | gds_files = [] |
| 224 | gds_buffers = [] |
| 225 | for i in range(async_queue): |
| 226 | f, buf = _get_test_write_file_and_device_buffer(tmpdir, ref_buffers[i], h, i) |
| 227 | gds_files.append(f) |
| 228 | gds_buffers.append(buf) |
| 229 | |
| 230 | _validate_handle_state(h, single_submit, overlap_events) |
| 231 | |
| 232 | for i in range(async_queue): |
| 233 | read_status = h.async_pwrite(gds_buffers[i], gds_files[i], 0) |
| 234 | assert read_status == 0 |
| 235 | |
| 236 | wait_status = h.wait() |
| 237 | assert wait_status == async_queue |
| 238 | |
| 239 | for t in gds_buffers: |
| 240 | h.unpin_device_tensor(t) |
| 241 | |
| 242 | for i in range(async_queue): |
| 243 | assert os.path.isfile(gds_files[i]) |
| 244 | |
| 245 | filecmp.clear_cache() |
| 246 | assert filecmp.cmp(ref_files[i], gds_files[i], shallow=False) |
| 247 | |
| 248 | |
| 249 | @pytest.mark.parametrize("use_new_api", [True, False]) |
nothing calls this directly
no test coverage detected