(self, tmpdir, file_partitions, use_cuda_pinned_tensor, use_fd)
| 332 | |
| 333 | @pytest.mark.parametrize('use_fd', [False, True]) |
| 334 | def test_offset_write(self, tmpdir, file_partitions, use_cuda_pinned_tensor, use_fd): |
| 335 | |
| 336 | _skip_for_invalid_environment(use_cuda_pinned_tensor=use_cuda_pinned_tensor) |
| 337 | ref_file = _get_file_path(tmpdir, '_py_random') |
| 338 | aio_file = _get_file_path(tmpdir, '_aio_random') |
| 339 | partition_unit_size = BLOCK_SIZE |
| 340 | file_size = sum(file_partitions) * partition_unit_size |
| 341 | |
| 342 | h = AsyncIOBuilder().load().aio_handle(BLOCK_SIZE, QUEUE_DEPTH, True, True, IO_PARALLEL) |
| 343 | |
| 344 | if use_cuda_pinned_tensor: |
| 345 | data_buffer = torch.ByteTensor(list(os.urandom(file_size))).pin_memory() |
| 346 | else: |
| 347 | data_buffer = h.new_cpu_locked_tensor(file_size, torch.empty(0, dtype=torch.uint8)) |
| 348 | |
| 349 | file_offsets = [] |
| 350 | next_offset = 0 |
| 351 | for i in range(len(file_partitions)): |
| 352 | file_offsets.append(next_offset) |
| 353 | next_offset += file_partitions[i] * partition_unit_size |
| 354 | |
| 355 | ref_fd = open(ref_file, 'wb') |
| 356 | for i in range(len(file_partitions)): |
| 357 | src_buffer = torch.narrow(data_buffer, 0, file_offsets[i], file_partitions[i] * partition_unit_size) |
| 358 | |
| 359 | ref_fd.write(src_buffer.numpy().tobytes()) |
| 360 | ref_fd.flush() |
| 361 | |
| 362 | if use_fd: |
| 363 | aio_fd = os.open(aio_file, flags=os.O_DIRECT | os.O_CREAT | os.O_WRONLY) |
| 364 | write_status = h.async_pwrite(buffer=src_buffer, fd=aio_fd, file_offset=file_offsets[i]) |
| 365 | else: |
| 366 | write_status = h.async_pwrite(buffer=src_buffer, filename=aio_file, file_offset=file_offsets[i]) |
| 367 | assert write_status == 0 |
| 368 | wait_status = h.wait() |
| 369 | assert wait_status == 1 |
| 370 | |
| 371 | if use_fd: |
| 372 | os.path.isfile(aio_fd) |
| 373 | os.close(aio_fd) |
| 374 | |
| 375 | filecmp.clear_cache() |
| 376 | assert filecmp.cmp(ref_file, aio_file, shallow=False) |
| 377 | |
| 378 | ref_fd.close() |
| 379 | |
| 380 | if not use_cuda_pinned_tensor: |
| 381 | h.free_cpu_locked_tensor(data_buffer) |
| 382 | |
| 383 | def test_offset_read(self, tmpdir, file_partitions, use_cuda_pinned_tensor): |
| 384 |
nothing calls this directly
no test coverage detected