Read Terraform outputs from the current state.
()
| 82 | |
| 83 | @pytest.fixture(scope="session") |
| 84 | def terraform_outputs(): |
| 85 | """Read Terraform outputs from the current state.""" |
| 86 | project = _get_project_id() |
| 87 | if not project: |
| 88 | pytest.skip( |
| 89 | "GCP_PROJECT_ID not set and no gcloud default project configured" |
| 90 | ) |
| 91 | |
| 92 | tf_bin = os.environ.get("ADK_TERRAFORM_BIN", "terraform") |
| 93 | tf_cwd = os.environ.get("ADK_TERRAFORM_CWD", TERRAFORM_DIR) |
| 94 | tf_args = shlex.split(os.environ.get("ADK_TERRAFORM_ARGS", "")) |
| 95 | |
| 96 | try: |
| 97 | # Build the command using provided overrides |
| 98 | cmd = [tf_bin] + tf_args + ["output", "-json"] |
| 99 | result = subprocess.run( |
| 100 | cmd, |
| 101 | cwd=tf_cwd, |
| 102 | capture_output=True, |
| 103 | text=True, |
| 104 | check=True, |
| 105 | ) |
| 106 | raw = json.loads(result.stdout) |
| 107 | return {k: v["value"] for k, v in raw.items()} |
| 108 | except ( |
| 109 | subprocess.CalledProcessError, |
| 110 | FileNotFoundError, |
| 111 | json.JSONDecodeError, |
| 112 | ) as e: |
| 113 | pytest.fail( |
| 114 | "Failed to read Terraform outputs. Ensure 'terraform apply' has been" |
| 115 | f" run successfully.\nCommand: {' '.join(cmd)}\nCWD:" |
| 116 | f" {tf_cwd}\nError: {e}" |
| 117 | ) |
| 118 | |
| 119 | |
| 120 | @pytest.fixture(scope="session") |