(self)
| 77 | class TestCompositeReader(TestCase): |
| 78 | @unittest.skipIf(os.environ.get('JENKINS_URL'), 'Flaky test on Jenkins') |
| 79 | def test_composite_reader(self): |
| 80 | ws = workspace.C.Workspace() |
| 81 | session = LocalSession(ws) |
| 82 | num_srcs = 3 |
| 83 | names = ["src_{}".format(i) for i in range(num_srcs)] |
| 84 | size = 100 |
| 85 | offsets = [i * size for i in range(num_srcs)] |
| 86 | src_dses = [make_source_dataset(ws, offset=offset, size=size, name=name) |
| 87 | for (name, offset) in zip(names, offsets)] |
| 88 | |
| 89 | data = [ws.fetch_blob(str(src.field_blobs[0])) for src in src_dses] |
| 90 | # Sanity check we didn't overwrite anything |
| 91 | for d, offset in zip(data, offsets): |
| 92 | npt.assert_array_equal(d, range(offset, offset + size)) |
| 93 | |
| 94 | # Make an identically-sized empty destination dataset |
| 95 | dst_ds_schema = schema.Struct( |
| 96 | *[ |
| 97 | (name, src_ds.content().clone_schema()) |
| 98 | for name, src_ds in zip(names, src_dses) |
| 99 | ] |
| 100 | ) |
| 101 | dst_ds = make_destination_dataset(ws, dst_ds_schema) |
| 102 | |
| 103 | with TaskGroup() as tg: |
| 104 | reader = CompositeReader(names, |
| 105 | [src_ds.reader() for src_ds in src_dses]) |
| 106 | pipe(reader, dst_ds.writer(), num_runtime_threads=3) |
| 107 | session.run(tg) |
| 108 | |
| 109 | for i in range(num_srcs): |
| 110 | written_data = sorted( |
| 111 | ws.fetch_blob(str(dst_ds.content()[names[i]].label()))) |
| 112 | npt.assert_array_equal(data[i], written_data, "i: {}".format(i)) |
| 113 | |
| 114 | @unittest.skipIf(os.environ.get('JENKINS_URL'), 'Flaky test on Jenkins') |
| 115 | def test_composite_reader_builder(self): |
nothing calls this directly
no test coverage detected