(g, fg)
| 1011 | @parametrize_idtype |
| 1012 | def test_flatten(idtype): |
| 1013 | def check_mapping(g, fg): |
| 1014 | if len(fg.ntypes) == 1: |
| 1015 | SRC = DST = fg.ntypes[0] |
| 1016 | else: |
| 1017 | SRC = fg.ntypes[0] |
| 1018 | DST = fg.ntypes[1] |
| 1019 | |
| 1020 | etypes = F.asnumpy(fg.edata[dgl.ETYPE]).tolist() |
| 1021 | eids = F.asnumpy(fg.edata[dgl.EID]).tolist() |
| 1022 | |
| 1023 | for i, (etype, eid) in enumerate(zip(etypes, eids)): |
| 1024 | src_g, dst_g = g.find_edges([eid], g.canonical_etypes[etype]) |
| 1025 | src_fg, dst_fg = fg.find_edges([i]) |
| 1026 | # TODO(gq): I feel this code is quite redundant; can we just add new members (like |
| 1027 | # "induced_srcid") to returned heterograph object and not store them as features? |
| 1028 | assert F.asnumpy(src_g) == F.asnumpy( |
| 1029 | F.gather_row(fg.nodes[SRC].data[dgl.NID], src_fg)[0] |
| 1030 | ) |
| 1031 | tid = F.asnumpy( |
| 1032 | F.gather_row(fg.nodes[SRC].data[dgl.NTYPE], src_fg) |
| 1033 | ).item() |
| 1034 | assert g.canonical_etypes[etype][0] == g.ntypes[tid] |
| 1035 | assert F.asnumpy(dst_g) == F.asnumpy( |
| 1036 | F.gather_row(fg.nodes[DST].data[dgl.NID], dst_fg)[0] |
| 1037 | ) |
| 1038 | tid = F.asnumpy( |
| 1039 | F.gather_row(fg.nodes[DST].data[dgl.NTYPE], dst_fg) |
| 1040 | ).item() |
| 1041 | assert g.canonical_etypes[etype][2] == g.ntypes[tid] |
| 1042 | |
| 1043 | # check for wildcard slices |
| 1044 | g = create_test_heterograph(idtype) |
no test coverage detected