(self)
| 236 | return textwrap.dedent(source) |
| 237 | |
| 238 | def _initialize_app(self): |
| 239 | # disable telemetry reporting for tests |
| 240 | os.environ["REFLEX_TELEMETRY_ENABLED"] = "false" |
| 241 | # Reset the global memo registry so previous AppHarness apps do not |
| 242 | # leak compiled component definitions into the next test app. |
| 243 | MEMOS.clear() |
| 244 | self.app_path.mkdir(parents=True, exist_ok=True) |
| 245 | if self.app_source is not None: |
| 246 | app_globals = self._get_globals_from_signature(self.app_source) |
| 247 | if isinstance(self.app_source, functools.partial): |
| 248 | self.app_source = self.app_source.func |
| 249 | # get the source from a function or module object |
| 250 | source_code = "\n".join([ |
| 251 | "\n".join([ |
| 252 | self.get_app_global_source(k, v) for k, v in app_globals.items() |
| 253 | ]), |
| 254 | self._get_source_from_app_source(self.app_source), |
| 255 | ]) |
| 256 | get_config().loglevel = reflex.constants.LogLevel.INFO |
| 257 | with chdir(self.app_path): |
| 258 | reflex.reflex._init( |
| 259 | name=self.app_name, |
| 260 | template=reflex.constants.Templates.DEFAULT, |
| 261 | ) |
| 262 | self.app_module_path.write_text(source_code) |
| 263 | else: |
| 264 | # Just initialize the web folder. |
| 265 | with chdir(self.app_path): |
| 266 | reflex.utils.prerequisites.initialize_frontend_dependencies() |
| 267 | with chdir(self.app_path): |
| 268 | # Use a new registration context for a new app. |
| 269 | if AppHarness._base_registration_context is None: |
| 270 | # Save the initial RegistrationContext for the app if we haven't already |
| 271 | AppHarness._base_registration_context = ( |
| 272 | RegistrationContext.ensure_context() |
| 273 | ) |
| 274 | new_registration_context = deepcopy(AppHarness._base_registration_context) |
| 275 | self._registry_token = RegistrationContext.set(new_registration_context) |
| 276 | # ensure config and app are reloaded when testing different app |
| 277 | config = get_config(reload=True) |
| 278 | # Ensure the AppHarness test does not skip State assignment due to running via pytest |
| 279 | os.environ.pop(reflex.constants.PYTEST_CURRENT_TEST, None) |
| 280 | os.environ[reflex.constants.APP_HARNESS_FLAG] = "true" |
| 281 | # Ensure we compile generated apps, and reload pre-existing app modules |
| 282 | # that were already imported so they can re-register memo definitions. |
| 283 | should_reload_app = ( |
| 284 | self.app_source is not None or config.module in sys.modules |
| 285 | ) |
| 286 | self.app_instance, self.app_module = ( |
| 287 | reflex.utils.prerequisites.get_and_validate_app( |
| 288 | reload=should_reload_app |
| 289 | ) |
| 290 | ) |
| 291 | self.app_asgi = self.app_instance() |
| 292 | |
| 293 | def _reload_state_module(self): |
| 294 | """Reload the rx.State module to avoid conflict when reloading.""" |
no test coverage detected