(
wheel, # type: Union[str, InstallableWheel]
destination, # type: str
compile=False, # type: bool
use_system_time=False, # type: bool
override_wheel_file_name=None, # type: Optional[str]
)
| 740 | |
| 741 | |
| 742 | def create_whl( |
| 743 | wheel, # type: Union[str, InstallableWheel] |
| 744 | destination, # type: str |
| 745 | compile=False, # type: bool |
| 746 | use_system_time=False, # type: bool |
| 747 | override_wheel_file_name=None, # type: Optional[str] |
| 748 | ): |
| 749 | # type: (...) -> str |
| 750 | |
| 751 | if not isinstance(wheel, InstallableWheel) and zipfile.is_zipfile(wheel): |
| 752 | wheel_dst = os.path.join(destination, os.path.basename(wheel)) |
| 753 | safe_copy(wheel, wheel_dst) |
| 754 | return wheel_dst |
| 755 | |
| 756 | wheel_to_create = ( |
| 757 | wheel if isinstance(wheel, InstallableWheel) else InstallableWheel.from_whl(wheel) |
| 758 | ) |
| 759 | whl_file_name = override_wheel_file_name or wheel_to_create.wheel_file_name |
| 760 | whl_chroot = os.path.join(safe_mkdtemp(prefix="pex_create_whl."), whl_file_name) |
| 761 | install_wheel( |
| 762 | wheel_to_create, |
| 763 | InstallPaths.wheel(destination=whl_chroot, wheel=wheel_to_create), |
| 764 | compile=compile, |
| 765 | install_entry_point_scripts=False, |
| 766 | ) |
| 767 | record_data = Wheel.load(whl_chroot).metadata_files.read("RECORD") |
| 768 | if record_data is None: |
| 769 | raise AssertionError(reportable_unexpected_error_msg()) |
| 770 | |
| 771 | wheel_path = os.path.join(destination, whl_file_name) |
| 772 | with open_zip(wheel_path, "w") as zip_fp: |
| 773 | if use_system_time and wheel_to_create.zip_metadata: |
| 774 | for zip_entry_info in wheel_to_create.zip_metadata: |
| 775 | src = os.path.join(whl_chroot, zip_entry_info.filename) |
| 776 | if not os.path.exists(src): |
| 777 | production_assert( |
| 778 | zip_entry_info.is_dir, |
| 779 | "The wheel entry {filename} is unexpectedly missing from {source}.", |
| 780 | filename=zip_entry_info.filename, |
| 781 | source=wheel_to_create.source, |
| 782 | ) |
| 783 | safe_mkdir(src) |
| 784 | zip_fp.write_ex( |
| 785 | src, |
| 786 | zip_entry_info.filename, |
| 787 | date_time=zip_entry_info.date_time_as_struct_time(), |
| 788 | file_mode=zip_entry_info.external_attr_as_stat_mode(), |
| 789 | ) |
| 790 | else: |
| 791 | for installed_file in _read_record_lines(record_data.decode("utf-8").splitlines()): |
| 792 | path = ( |
| 793 | installed_file.dir_info.path |
| 794 | if isinstance(installed_file, InstalledDirectory) |
| 795 | else installed_file.path |
| 796 | ) |
| 797 | src = os.path.join(whl_chroot, path) |
| 798 | if not os.path.exists(src): |
| 799 | production_assert( |
no test coverage detected