(dataset_dir, retrieval, features, matches, results,
skip_matches=None)
| 112 | |
| 113 | |
| 114 | def main(dataset_dir, retrieval, features, matches, results, |
| 115 | skip_matches=None): |
| 116 | |
| 117 | assert retrieval.exists(), retrieval |
| 118 | assert features.exists(), features |
| 119 | assert matches.exists(), matches |
| 120 | |
| 121 | retrieval_dict = parse_retrieval(retrieval) |
| 122 | queries = list(retrieval_dict.keys()) |
| 123 | |
| 124 | feature_file = h5py.File(features, 'r', libver='latest') |
| 125 | match_file = h5py.File(matches, 'r', libver='latest') |
| 126 | |
| 127 | poses = {} |
| 128 | logs = { |
| 129 | 'features': features, |
| 130 | 'matches': matches, |
| 131 | 'retrieval': retrieval, |
| 132 | 'loc': {}, |
| 133 | } |
| 134 | logger.info('Starting localization...') |
| 135 | for q in tqdm(queries): |
| 136 | db = retrieval_dict[q] |
| 137 | ret, mkpq, mkpr, mkp3d, indices, num_matches = pose_from_cluster( |
| 138 | dataset_dir, q, db, feature_file, match_file, skip_matches) |
| 139 | |
| 140 | poses[q] = (ret['qvec'], ret['tvec']) |
| 141 | logs['loc'][q] = { |
| 142 | 'db': db, |
| 143 | 'PnP_ret': ret, |
| 144 | 'keypoints_query': mkpq, |
| 145 | 'keypoints_db': mkpr, |
| 146 | '3d_points': mkp3d, |
| 147 | 'indices_db': indices, |
| 148 | 'num_matches': num_matches, |
| 149 | } |
| 150 | |
| 151 | logger.info(f'Writing poses to {results}...') |
| 152 | with open(results, 'w') as f: |
| 153 | for q in queries: |
| 154 | qvec, tvec = poses[q] |
| 155 | qvec = ' '.join(map(str, qvec)) |
| 156 | tvec = ' '.join(map(str, tvec)) |
| 157 | name = q.split("/")[-1] |
| 158 | f.write(f'{name} {qvec} {tvec}\n') |
| 159 | |
| 160 | logs_path = f'{results}_logs.pkl' |
| 161 | logger.info(f'Writing logs to {logs_path}...') |
| 162 | with open(logs_path, 'wb') as f: |
| 163 | pickle.dump(logs, f) |
| 164 | logger.info('Done!') |
| 165 | |
| 166 | |
| 167 | if __name__ == '__main__': |
no test coverage detected