MCPcopy Index your code
hub / github.com/getsentry/sentry / process_and_save

Function process_and_save

tests/sentry/tasks/test_reprocessing2.py:53–76  ·  view source on GitHub ↗
(default_project, task_runner)

Source from the content-addressed store, hash-verified

51
52@pytest.fixture
53def process_and_save(default_project, task_runner):
54 def inner(data, seconds_ago=1):
55 # Set platform to native so all parts of reprocessing fire, symbolication will
56 # not happen without this set to certain values
57 data.setdefault("platform", "native")
58 # Every request to snuba has a timestamp that's clamped in a curious way to
59 # ensure data consistency
60 data.setdefault(
61 "timestamp", before_now(seconds=seconds_ago).replace(microsecond=0).isoformat()
62 )
63 mgr = EventManager(data=data, project=default_project)
64 mgr.normalize()
65 data = mgr.get_data()
66 event_id = data["event_id"]
67 cache_key = event_processing_store.store(dict(data))
68
69 with task_runner():
70 # factories.store_event would almost be suitable for this, but let's
71 # actually run through stacktrace processing once
72 preprocess_event(start_time=time(), cache_key=cache_key, data=data)
73
74 return event_id
75
76 return inner
77
78
79@pytest.fixture

Calls

no outgoing calls

Tested by

no test coverage detected