MCPcopy Index your code
hub / github.com/kubernetes-client/python / __search

Method __search

kubernetes/base/dynamic/discovery.py:254–294  ·  view source on GitHub ↗
(self,  parts, resources, reqParams)

Source from the content-addressed store, hash-verified

252 return results
253
254 def __search(self, parts, resources, reqParams):
255 part = parts[0]
256 if part != '*':
257
258 resourcePart = resources.get(part)
259 if not resourcePart:
260 return []
261 elif isinstance(resourcePart, ResourceGroup):
262 if len(reqParams) != 2:
263 raise ValueError("prefix and group params should be present, have %s" % reqParams)
264 # Check if we've requested resources for this group
265 if not resourcePart.resources:
266 prefix, group, version = reqParams[0], reqParams[1], part
267 try:
268 resourcePart.resources = self.get_resources_for_api_version(
269 prefix, group, part, resourcePart.preferred)
270 except NotFoundError:
271 raise ResourceNotFoundError
272
273 self._cache['resources'][prefix][group][version] = resourcePart
274 self.__update_cache = True
275 return self.__search(parts[1:], resourcePart.resources, reqParams)
276 elif isinstance(resourcePart, dict):
277 # In this case parts [0] will be a specified prefix, group, version
278 # as we recurse
279 return self.__search(parts[1:], resourcePart, reqParams + [part] )
280 else:
281 if parts[1] != '*' and isinstance(parts[1], dict):
282 for _resource in resourcePart:
283 for term, value in parts[1].items():
284 if getattr(_resource, term) == value:
285 return [_resource]
286
287 return []
288 else:
289 return resourcePart
290 else:
291 matches = []
292 for key in resources.keys():
293 matches.extend(self.__search([key] + parts[1:], resources, reqParams))
294 return matches
295
296 def __build_search(self, prefix=None, group=None, api_version=None, kind=None, **kwargs):
297 if not group and api_version and '/' in api_version:

Callers 1

searchMethod · 0.95

Calls 3

getMethod · 0.45
itemsMethod · 0.45

Tested by

no test coverage detected