(self)
| 351 | return super().execute(context=serializable_context) |
| 352 | |
| 353 | def execute_callable(self): |
| 354 | with TemporaryDirectory(prefix='venv') as tmp_dir: |
| 355 | if self.templates_dict: |
| 356 | self.op_kwargs['templates_dict'] = self.templates_dict |
| 357 | |
| 358 | input_filename = os.path.join(tmp_dir, 'script.in') |
| 359 | output_filename = os.path.join(tmp_dir, 'script.out') |
| 360 | string_args_filename = os.path.join(tmp_dir, 'string_args.txt') |
| 361 | script_filename = os.path.join(tmp_dir, 'script.py') |
| 362 | |
| 363 | prepare_virtualenv( |
| 364 | venv_directory=tmp_dir, |
| 365 | python_bin=f'python{self.python_version}' if self.python_version else None, |
| 366 | system_site_packages=self.system_site_packages, |
| 367 | requirements=self.requirements, |
| 368 | ) |
| 369 | |
| 370 | self._write_args(input_filename) |
| 371 | self._write_string_args(string_args_filename) |
| 372 | write_python_script( |
| 373 | jinja_context=dict( |
| 374 | op_args=self.op_args, |
| 375 | op_kwargs=self.op_kwargs, |
| 376 | pickling_library=self.pickling_library.__name__, |
| 377 | python_callable=self.python_callable.__name__, |
| 378 | python_callable_source=dedent(inspect.getsource(self.python_callable)), |
| 379 | ), |
| 380 | filename=script_filename, |
| 381 | ) |
| 382 | |
| 383 | execute_in_subprocess( |
| 384 | cmd=[ |
| 385 | f'{tmp_dir}/bin/python', |
| 386 | script_filename, |
| 387 | input_filename, |
| 388 | output_filename, |
| 389 | string_args_filename, |
| 390 | ] |
| 391 | ) |
| 392 | |
| 393 | return self._read_result(output_filename) |
| 394 | |
| 395 | def _write_args(self, filename): |
| 396 | if self.op_args or self.op_kwargs: |
nothing calls this directly
no test coverage detected