Loads a module in `script_relative_path` by splitting it into a script module (file part) and package (folders). `module_path` is added to sys.path Optionally, missing imports will be ignored by importing a dummy module instead.
(
module_path: str, script_relative_path: str, ignore_missing_imports: bool = False
)
| 31 | |
| 32 | |
| 33 | def import_script_module( |
| 34 | module_path: str, script_relative_path: str, ignore_missing_imports: bool = False |
| 35 | ) -> ModuleType: |
| 36 | """Loads a module in `script_relative_path` by splitting it into a script module (file part) and package (folders). `module_path` is added to sys.path |
| 37 | Optionally, missing imports will be ignored by importing a dummy module instead. |
| 38 | """ |
| 39 | if os.path.isabs(script_relative_path): |
| 40 | raise ValueError(script_relative_path, f"Not relative path to `{module_path}`") |
| 41 | |
| 42 | module, _ = os.path.splitext(script_relative_path) |
| 43 | module = ".".join(Path(module).parts) |
| 44 | |
| 45 | # add path to module search |
| 46 | sys_path: str = None |
| 47 | if module_path not in sys.path: |
| 48 | sys_path = module_path |
| 49 | # path must be first so we always load our module of |
| 50 | sys.path.insert(0, sys_path) |
| 51 | try: |
| 52 | logger.info(f"Importing pipeline script from module `{module}` with path `{module_path}`") |
| 53 | if ignore_missing_imports: |
| 54 | return import_module_with_missing(module) |
| 55 | else: |
| 56 | return import_module(module) |
| 57 | |
| 58 | finally: |
| 59 | # remove script module path |
| 60 | if sys_path: |
| 61 | sys.path.remove(sys_path) |
| 62 | |
| 63 | |
| 64 | def import_pipeline_script( |