Gets a value for an Airflow Variable Key :param key: Variable Key :param default_var: Default value of the Variable if the Variable doesn't exists :param deserialize_json: Deserialize the value to a Python dict
(
cls,
key: str,
default_var: Any = __NO_DEFAULT_SENTINEL,
deserialize_json: bool = False,
)
| 112 | |
| 113 | @classmethod |
| 114 | def get( |
| 115 | cls, |
| 116 | key: str, |
| 117 | default_var: Any = __NO_DEFAULT_SENTINEL, |
| 118 | deserialize_json: bool = False, |
| 119 | ) -> Any: |
| 120 | """ |
| 121 | Gets a value for an Airflow Variable Key |
| 122 | |
| 123 | :param key: Variable Key |
| 124 | :param default_var: Default value of the Variable if the Variable doesn't exists |
| 125 | :param deserialize_json: Deserialize the value to a Python dict |
| 126 | """ |
| 127 | var_val = Variable.get_variable_from_secrets(key=key) |
| 128 | if var_val is None: |
| 129 | if default_var is not cls.__NO_DEFAULT_SENTINEL: |
| 130 | return default_var |
| 131 | else: |
| 132 | raise KeyError(f'Variable {key} does not exist') |
| 133 | else: |
| 134 | if deserialize_json: |
| 135 | return json.loads(var_val) |
| 136 | else: |
| 137 | return var_val |
| 138 | |
| 139 | @classmethod |
| 140 | @provide_session |