MCPcopy
hub / github.com/apache/caldera / add_test_variants

Method add_test_variants

app/utility/base_planning_svc.py:78–137  ·  view source on GitHub ↗

Create a list of all possible links for a given set of templates :param links: :param agent: :param facts: :param rules: :param operation: :param trim_unset_variables: :param trim_missing_requirements: :return: updated list of

(self, links, agent, facts=(), rules=(), operation=None,
                                trim_unset_variables=False, trim_missing_requirements=False)

Source from the content-addressed store, hash-verified

76 return links
77
78 async def add_test_variants(self, links, agent, facts=(), rules=(), operation=None,
79 trim_unset_variables=False, trim_missing_requirements=False):
80 """
81 Create a list of all possible links for a given set of templates
82
83 :param links:
84 :param agent:
85 :param facts:
86 :param rules:
87 :param operation:
88 :param trim_unset_variables:
89 :param trim_missing_requirements:
90 :return: updated list of links
91 """
92 link_variants = []
93 rule_set = RuleSet(rules=rules)
94
95 for link in links:
96 decoded_test = agent.replace(link.command, file_svc=self.get_service('file_svc'))
97 variables = set(x for x in re.findall(self.re_variable, decoded_test) if not self.is_global_variable(x))
98
99 if not link.host:
100 link.host = agent.host # Required to allow host specific parsers to work
101
102 if not variables:
103 # apply_id() modifies link.command so the order of these operations matter
104 link.command = self.encode_string(decoded_test)
105 link.apply_id(agent.host)
106 continue
107
108 relevant_facts = await self._build_relevant_facts(variables, facts)
109 valid_facts = [await self._trim_by_limit(
110 decoded_test,
111 (await rule_set.apply_rules(facts=fact_set))[0]
112 ) for fact_set in relevant_facts]
113 combos = list(itertools.product(*valid_facts))
114
115 if trim_unset_variables:
116 combos = [combo for combo in combos if not await self._has_unset_variables(combo, variables)]
117 if trim_missing_requirements:
118 combos = [combo for combo in combos if not await self._is_missing_requirements(link, combo, operation)]
119
120 for combo in combos:
121 try:
122 copy_test = copy.copy(decoded_test)
123 variant, score, used = await self._build_single_test_variant(copy_test, combo, link.executor.name)
124
125 copy_link = pickle.loads(pickle.dumps(link)) # nosec
126 copy_link.command = self.encode_string(variant)
127 copy_link.score = score
128 copy_link.used.extend(used)
129 copy_link.apply_id(agent.host)
130 link_variants.append(copy_link)
131 except Exception as ex:
132 logging.error('Could not create test variant: %s.\nLink=%s' % (ex, link.__dict__))
133
134 if trim_unset_variables:
135 links = await self.remove_links_with_unset_variables(links)

Calls 14

is_global_variableMethod · 0.95
_build_relevant_factsMethod · 0.95
_trim_by_limitMethod · 0.95
apply_rulesMethod · 0.95
_has_unset_variablesMethod · 0.95
RuleSetClass · 0.90
setFunction · 0.85
replaceMethod · 0.80
get_serviceMethod · 0.80