(
port_num,
rank,
num_parts,
world_size,
partitions_dir,
ntypes,
id_map,
test_data,
)
| 54 | |
| 55 | |
| 56 | def _run( |
| 57 | port_num, |
| 58 | rank, |
| 59 | num_parts, |
| 60 | world_size, |
| 61 | partitions_dir, |
| 62 | ntypes, |
| 63 | id_map, |
| 64 | test_data, |
| 65 | ): |
| 66 | os.environ["MASTER_ADDR"] = "127.0.0.1" |
| 67 | os.environ["MASTER_PORT"] = str(port_num) |
| 68 | |
| 69 | _init_process_group(rank, world_size) |
| 70 | lookup = _create_lookup_service( |
| 71 | partitions_dir, ntypes, id_map, rank, world_size, num_parts |
| 72 | ) |
| 73 | |
| 74 | tests_exec = 0 |
| 75 | for worker, data in test_data.items(): |
| 76 | if f"rank-{rank}" == worker: |
| 77 | for item in data: |
| 78 | method = item[0] |
| 79 | request = item[1] |
| 80 | response = item[2] |
| 81 | |
| 82 | if method == "getpartitionids": |
| 83 | ret_val = lookup.get_partition_ids(request) |
| 84 | tests_exec += 1 |
| 85 | assert np.all(ret_val == response) |
| 86 | else: |
| 87 | assert False |
| 88 | |
| 89 | # ensure all the tests are executed. |
| 90 | rank_counts = allgather_sizes([tests_exec], world_size, num_parts, True) |
| 91 | assert np.sum(rank_counts) == len(test_data) |
| 92 | |
| 93 | |
| 94 | def _single_machine_run( |
nothing calls this directly
no test coverage detected