(self, query, **kwargs)
| 145 | return self.factory.create_query(schedule=self.schedule(**kwargs)) |
| 146 | |
| 147 | def fake_previous_execution(self, query, **kwargs): |
| 148 | retrieved_at = utcnow() - datetime.timedelta(**kwargs) |
| 149 | query_result = self.factory.create_query_result( |
| 150 | retrieved_at=retrieved_at, |
| 151 | query_text=query.query_text, |
| 152 | query_hash=query.query_hash, |
| 153 | ) |
| 154 | query.latest_query_data = query_result |
| 155 | |
| 156 | # TODO: this test can be refactored to use mock version of should_schedule_next to simplify it. |
| 157 | def test_outdated_queries_skips_unscheduled_queries(self): |
no test coverage detected