(self)
| 198 | self.consul_http_port = consul.http_port |
| 199 | |
| 200 | async def init(self): |
| 201 | args = [ |
| 202 | os.environ.get("EDGEDB_TEST_STOLON_CTL", "stolonctl"), |
| 203 | "--cluster-name", |
| 204 | self.cluster_name, |
| 205 | "--store-backend=consul", |
| 206 | "--store-endpoints", |
| 207 | f"http://127.0.0.1:{self.consul_http_port}", |
| 208 | "init", |
| 209 | "--yes", |
| 210 | json.dumps(dict( |
| 211 | initMode="new", |
| 212 | sleepInterval="0.5s", |
| 213 | requestTimeout="1s", |
| 214 | failInterval="2s", |
| 215 | )), |
| 216 | ] |
| 217 | if self.debug: |
| 218 | print("Running:", args) |
| 219 | p = await asyncio.create_subprocess_exec( |
| 220 | *args, |
| 221 | stdout=subprocess.PIPE, |
| 222 | stderr=subprocess.STDOUT, |
| 223 | ) |
| 224 | stdout, _ = await p.communicate() |
| 225 | if p.returncode: |
| 226 | return stdout.decode() |
| 227 | return "" |
| 228 | |
| 229 | async def __aenter__(self): |
| 230 | start = time.monotonic() |
no test coverage detected