(*args, **kwargs)
| 1361 | def track_rewrites(name:Callable[..., str|TracingKey]|bool=True, replay:bool=False): |
| 1362 | def _decorator(func): |
| 1363 | def __wrapper(*args, **kwargs): |
| 1364 | fn = key = func.__name__ |
| 1365 | idx = -1 |
| 1366 | if TRACK_MATCH_STATS >= 2: |
| 1367 | add_trace_group(key:=TracingKey(n:=f"{fn} n{next(_name_cnt.setdefault(fn, itertools.count(1)))}", (n,))) |
| 1368 | active_group.append(idx:=len(tracked_keys)-1) |
| 1369 | with cpu_profile(key, "TINY") as e: |
| 1370 | ret = func(*args, **kwargs) |
| 1371 | if TRACK_MATCH_STATS >= 2: active_group.pop() |
| 1372 | if TRACK_MATCH_STATS >= 2 and callable(name): |
| 1373 | name_ret = name(*args, **kwargs, ret=ret) |
| 1374 | assert isinstance(name_ret, (TracingKey, str)), f"name function returned {type(name_ret)}" |
| 1375 | tracked_keys[idx] = k = TracingKey(n:=tracked_keys[idx].display_name.replace(fn, name_ret), (n,)) if isinstance(name_ret, str) else name_ret |
| 1376 | e.name = TracingKey(k.display_name if isinstance(name_ret, str) else f"{fn} for {k.display_name}", k.keys) |
| 1377 | if CAPTURE_PROCESS_REPLAY and replay: |
| 1378 | # find the unittest frame we're capturing in |
| 1379 | frm = sys._getframe(1) |
| 1380 | while (f_back:=frm.f_back) is not None and "unittest" not in f_back.f_code.co_filename: frm = f_back |
| 1381 | loc = f"{frm.f_code.co_filename.split('/')[-1]}:{frm.f_lineno} {frm.f_code.co_name}" |
| 1382 | # capture global context vars and all the args passed in |
| 1383 | inputs = (fn, args, kwargs, ContextVar._cache) |
| 1384 | replay_capture.append(pickle.dumps(inputs+(loc, ret))) |
| 1385 | return ret |
| 1386 | return __wrapper |
| 1387 | return _decorator |
| 1388 |
nothing calls this directly
no test coverage detected
searching dependent graphs…