(cls)
| 109 | |
| 110 | @classmethod |
| 111 | def setup_class(cls): |
| 112 | if "DEBUG" in os.environ: |
| 113 | segs = filter(lambda x: os.path.exists(os.path.join(x, "rlog.zst")), Path(Paths.log_root()).iterdir()) |
| 114 | segs = sorted(segs, key=lambda x: x.stat().st_mtime) |
| 115 | cls.lr = list(LogReader(os.path.join(segs[-1], "rlog.zst"))) |
| 116 | cls.ts = msgs_to_time_series(cls.lr) |
| 117 | return |
| 118 | |
| 119 | # setup env |
| 120 | params = Params() |
| 121 | params.remove("CurrentRoute") |
| 122 | params.put_bool("RecordFront", True, block=True) |
| 123 | set_params_enabled() |
| 124 | os.environ['REPLAY'] = '1' |
| 125 | os.environ['MSGQ_PREALLOC'] = '1' |
| 126 | os.environ['TESTING_CLOSET'] = '1' |
| 127 | if os.path.exists(Paths.log_root()): |
| 128 | shutil.rmtree(Paths.log_root()) |
| 129 | |
| 130 | # start manager and run openpilot for TEST_DURATION |
| 131 | proc = None |
| 132 | try: |
| 133 | manager_path = os.path.join(BASEDIR, "system/manager/manager.py") |
| 134 | cls.manager_st = time.monotonic() |
| 135 | proc = subprocess.Popen(["python", manager_path]) |
| 136 | |
| 137 | sm = messaging.SubMaster(['carState']) |
| 138 | with Timeout(30, "controls didn't start"): |
| 139 | while not sm.seen['carState']: |
| 140 | sm.update(1000) |
| 141 | |
| 142 | route = params.get("CurrentRoute") |
| 143 | assert route is not None |
| 144 | |
| 145 | segs = list(Path(Paths.log_root()).glob(f"{route}--*")) |
| 146 | assert len(segs) == 1 |
| 147 | |
| 148 | time.sleep(TEST_DURATION) |
| 149 | finally: |
| 150 | if proc is not None: |
| 151 | proc.terminate() |
| 152 | if proc.wait(60) is None: |
| 153 | proc.kill() |
| 154 | |
| 155 | cls.lr = list(LogReader(os.path.join(str(segs[0]), "rlog.zst"))) |
| 156 | st = time.monotonic() |
| 157 | cls.ts = msgs_to_time_series(cls.lr) |
| 158 | print("msgs to time series", time.monotonic() - st) |
| 159 | log_path = segs[0] |
| 160 | |
| 161 | cls.log_sizes = {} |
| 162 | for f in log_path.iterdir(): |
| 163 | assert f.is_file() |
| 164 | cls.log_sizes[f] = f.stat().st_size / 1e6 |
| 165 | |
| 166 | cls.msgs = defaultdict(list) |
| 167 | for m in cls.lr: |
| 168 | cls.msgs[m.which()].append(m) |
nothing calls this directly
no test coverage detected