(
ctx: click.Context,
)
| 486 | |
| 487 | |
| 488 | def create_feature_store( |
| 489 | ctx: click.Context, |
| 490 | ) -> FeatureStore: |
| 491 | repo = ctx.obj["CHDIR"] |
| 492 | # If we received a base64 encoded version of feature_store.yaml, use that |
| 493 | config_base64 = os.getenv(FEATURE_STORE_YAML_ENV_NAME) |
| 494 | if config_base64: |
| 495 | print("Received base64 encoded feature_store.yaml") |
| 496 | config_bytes = base64.b64decode(config_base64) |
| 497 | # Create a new unique directory for writing feature_store.yaml |
| 498 | repo_path = Path(tempfile.mkdtemp()) |
| 499 | with open(repo_path / "feature_store.yaml", "wb") as f: |
| 500 | f.write(config_bytes) |
| 501 | return FeatureStore(repo_path=str(repo_path.resolve())) |
| 502 | else: |
| 503 | fs_yaml_file = ctx.obj["FS_YAML_FILE"] |
| 504 | cli_check_repo(repo, fs_yaml_file) |
| 505 | return FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) |
| 506 | |
| 507 | |
| 508 | def apply_total( |
no test coverage detected