MCPcopy Index your code
hub / github.com/apache/airflow / execute_callable

Method execute_callable

airflow/operators/python.py:353–393  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

351 return super().execute(context=serializable_context)
352
353 def execute_callable(self):
354 with TemporaryDirectory(prefix='venv') as tmp_dir:
355 if self.templates_dict:
356 self.op_kwargs['templates_dict'] = self.templates_dict
357
358 input_filename = os.path.join(tmp_dir, 'script.in')
359 output_filename = os.path.join(tmp_dir, 'script.out')
360 string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
361 script_filename = os.path.join(tmp_dir, 'script.py')
362
363 prepare_virtualenv(
364 venv_directory=tmp_dir,
365 python_bin=f'python{self.python_version}' if self.python_version else None,
366 system_site_packages=self.system_site_packages,
367 requirements=self.requirements,
368 )
369
370 self._write_args(input_filename)
371 self._write_string_args(string_args_filename)
372 write_python_script(
373 jinja_context=dict(
374 op_args=self.op_args,
375 op_kwargs=self.op_kwargs,
376 pickling_library=self.pickling_library.__name__,
377 python_callable=self.python_callable.__name__,
378 python_callable_source=dedent(inspect.getsource(self.python_callable)),
379 ),
380 filename=script_filename,
381 )
382
383 execute_in_subprocess(
384 cmd=[
385 f'{tmp_dir}/bin/python',
386 script_filename,
387 input_filename,
388 output_filename,
389 string_args_filename,
390 ]
391 )
392
393 return self._read_result(output_filename)
394
395 def _write_args(self, filename):
396 if self.op_args or self.op_kwargs:

Callers

nothing calls this directly

Calls 7

_write_argsMethod · 0.95
_write_string_argsMethod · 0.95
_read_resultMethod · 0.95
prepare_virtualenvFunction · 0.90
write_python_scriptFunction · 0.90
execute_in_subprocessFunction · 0.90
TemporaryDirectoryFunction · 0.85

Tested by

no test coverage detected