Load services.py or services.yaml from agents_dir for custom service registration. If services.yaml or services.yml is found, it will be loaded first, followed by services.py if it exists. Skip if neither services.yaml/yml nor services.py is not found.
(agents_dir: str)
| 178 | |
| 179 | |
| 180 | def load_services_module(agents_dir: str) -> None: |
| 181 | """Load services.py or services.yaml from agents_dir for custom service registration. |
| 182 | |
| 183 | If services.yaml or services.yml is found, it will be loaded first, |
| 184 | followed by services.py if it exists. |
| 185 | |
| 186 | Skip if neither services.yaml/yml nor services.py is not found. |
| 187 | """ |
| 188 | if not os.path.isdir(agents_dir): |
| 189 | logger.debug( |
| 190 | "agents_dir %s is not a valid directory, skipping service loading.", |
| 191 | agents_dir, |
| 192 | ) |
| 193 | return |
| 194 | if agents_dir not in sys.path: |
| 195 | sys.path.insert(0, agents_dir) |
| 196 | |
| 197 | # Try loading services.yaml or services.yml first |
| 198 | for yaml_file in ["services.yaml", "services.yml"]: |
| 199 | yaml_path = os.path.join(agents_dir, yaml_file) |
| 200 | if os.path.exists(yaml_path): |
| 201 | try: |
| 202 | config = yaml_utils.load_yaml_file(yaml_path) |
| 203 | _register_services_from_yaml_config(config, get_service_registry()) |
| 204 | logger.debug( |
| 205 | "Loaded custom services from %s in %s.", yaml_file, agents_dir |
| 206 | ) |
| 207 | except Exception as e: |
| 208 | logger.warning( |
| 209 | "Failed to load %s from %s: %s", |
| 210 | yaml_file, |
| 211 | agents_dir, |
| 212 | e, |
| 213 | ) |
| 214 | return # If yaml exists but fails to load, stop. |
| 215 | |
| 216 | try: |
| 217 | importlib.import_module("services") |
| 218 | logger.debug( |
| 219 | "Loaded services.py from %s for custom service registration.", |
| 220 | agents_dir, |
| 221 | ) |
| 222 | except ModuleNotFoundError: |
| 223 | logger.debug("services.py not found in %s, skipping.", agents_dir) |
| 224 | except Exception as e: |
| 225 | logger.warning( |
| 226 | "Failed to load services.py from %s: %s", |
| 227 | agents_dir, |
| 228 | e, |
| 229 | ) |
| 230 | |
| 231 | |
| 232 | _service_registry_instance: ServiceRegistry | None = None |
no test coverage detected