(
workspace_files: FileStorage,
source_name: str,
destination_name: str,
has_source_section: bool = True,
)
| 748 | |
| 749 | |
| 750 | def assert_source_files( |
| 751 | workspace_files: FileStorage, |
| 752 | source_name: str, |
| 753 | destination_name: str, |
| 754 | has_source_section: bool = True, |
| 755 | ) -> Tuple[PipelineScriptVisitor, SecretsTomlProvider]: |
| 756 | visitor, secrets = assert_common_files( |
| 757 | workspace_files, source_name + "_pipeline.py", destination_name |
| 758 | ) |
| 759 | assert workspace_files.has_folder(source_name) == ( |
| 760 | source_name not in [*CORE_SOURCES, *TEMPLATES] |
| 761 | ) |
| 762 | source_secrets = secrets.get_value(source_name, type, None, source_name) |
| 763 | if has_source_section: |
| 764 | assert source_secrets is not None |
| 765 | else: |
| 766 | assert source_secrets is None |
| 767 | local_index = files_ops.load_verified_sources_local_index(source_name) |
| 768 | for file_entry in local_index["files"].values(): |
| 769 | # all files have the newest commit (first time clone) |
| 770 | assert file_entry["commit_sha"] == local_index["last_commit_sha"] |
| 771 | # sha1 |
| 772 | assert len(HexBytes(file_entry["commit_sha"])) == 20 |
| 773 | # git sha |
| 774 | assert len(HexBytes(file_entry["git_sha"])) == 20 |
| 775 | # sha3 |
| 776 | assert len(HexBytes(file_entry["sha3_256"])) == 32 |
| 777 | |
| 778 | return visitor, secrets |
| 779 | |
| 780 | |
| 781 | def assert_common_files( |
no test coverage detected