(self, args: argparse.Namespace, parser: argparse.ArgumentParser)
| 56 | ) |
| 57 | |
| 58 | def run(self, args: argparse.Namespace, parser: argparse.ArgumentParser) -> None: |
| 59 | max_cols = SummaryReporter.N_COLUMNS |
| 60 | if args.sort_column < 1 or args.sort_column > max_cols: |
| 61 | parser.error(f"The --sort-column argument must be between 1 and {max_cols}") |
| 62 | |
| 63 | result_path = Path(args.results) |
| 64 | if not result_path.exists() or not result_path.is_file(): |
| 65 | raise MemrayCommandError(f"No such file: {args.results}", exit_code=1) |
| 66 | |
| 67 | try: |
| 68 | reader = FileReader(os.fspath(args.results), report_progress=True) |
| 69 | if reader.metadata.has_native_traces: |
| 70 | warn_if_not_enough_symbols() |
| 71 | |
| 72 | if not args.temporary_allocation_threshold >= 0: |
| 73 | warn_if_file_is_not_aggregated_and_is_too_big(reader, result_path) |
| 74 | |
| 75 | if args.temporary_allocation_threshold >= 0: |
| 76 | snapshot = iter( |
| 77 | reader.get_temporary_allocation_records( |
| 78 | threshold=args.temporary_allocation_threshold, |
| 79 | merge_threads=False, |
| 80 | ) |
| 81 | ) |
| 82 | else: |
| 83 | snapshot = iter( |
| 84 | reader.get_high_watermark_allocation_records(merge_threads=True) |
| 85 | ) |
| 86 | except OSError as e: |
| 87 | raise MemrayCommandError( |
| 88 | f"Failed to parse allocation records in {result_path}\nReason: {e}", |
| 89 | exit_code=1, |
| 90 | ) |
| 91 | reporter = SummaryReporter.from_snapshot( |
| 92 | snapshot, |
| 93 | native=reader.metadata.has_native_traces, |
| 94 | ) |
| 95 | reporter.render(sort_column=args.sort_column, max_rows=args.max_rows) |
nothing calls this directly
no test coverage detected