()
| 157 | |
| 158 | @pytest_asyncio.fixture |
| 159 | async def haproxy_api_cleanup(): |
| 160 | registered_apis = [] |
| 161 | |
| 162 | def register(api: Optional[HAProxyApi]) -> None: |
| 163 | if api is not None: |
| 164 | registered_apis.append(api) |
| 165 | |
| 166 | yield register |
| 167 | |
| 168 | for api in registered_apis: |
| 169 | proc = getattr(api, "_proc", None) |
| 170 | if proc and proc.returncode is None: |
| 171 | try: |
| 172 | await api.stop() |
| 173 | except Exception as exc: # pragma: no cover - best effort cleanup |
| 174 | logger.warning(f"Failed to stop HAProxy API cleanly: {exc}") |
| 175 | try: |
| 176 | proc.kill() |
| 177 | await proc.wait() |
| 178 | except Exception as kill_exc: |
| 179 | logger.error( |
| 180 | f"Failed to kill HAProxy process {proc.pid}: {kill_exc}" |
| 181 | ) |
| 182 | elif proc and proc.returncode is not None: |
| 183 | continue |
| 184 | |
| 185 | |
| 186 | def test_generate_config_file_internal(haproxy_api_cleanup): |
no test coverage detected
searching dependent graphs…