MCPcopy
hub / github.com/apache/caldera / Adversary

Class Adversary

app/objects/c_adversary.py:54–111  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

52
53
54class Adversary(FirstClassObjectInterface, BaseObject):
55
56 schema = AdversarySchema()
57
58 @property
59 def unique(self):
60 return self.hash('%s' % self.adversary_id)
61
62 def __init__(self, name='', adversary_id='', description='', atomic_ordering=(), objective='', tags=None, plugin=''):
63 super().__init__()
64 self.adversary_id = adversary_id if adversary_id else str(uuid.uuid4())
65 self.name = name
66 self.description = description
67 self.atomic_ordering = atomic_ordering
68 self.objective = objective or DEFAULT_OBJECTIVE_ID
69 self.tags = set(tags) if tags else set()
70 self.has_repeatable_abilities = False
71 self.plugin = plugin
72
73 def store(self, ram):
74 existing = self.retrieve(ram['adversaries'], self.unique)
75 if not existing:
76 ram['adversaries'].append(self)
77 return self.retrieve(ram['adversaries'], self.unique)
78 existing.update('name', self.name)
79 existing.update('description', self.description)
80 existing.update('atomic_ordering', self.atomic_ordering)
81 existing.update('objective', self.objective)
82 existing.update('tags', self.tags)
83 existing.update('has_repeatable_abilities', self.check_repeatable_abilities(ram['abilities']))
84 existing.update('plugin', self.plugin)
85 return existing
86
87 def verify(self, log, abilities, objectives):
88 for ability_id in self.atomic_ordering:
89 if not next((ability for ability in abilities if ability.ability_id == ability_id), None):
90 log.warning('Ability referenced in adversary %s but not found: %s', self.adversary_id, ability_id)
91
92 if not self.objective:
93 self.objective = DEFAULT_OBJECTIVE_ID
94 elif not next((objective for objective in objectives if objective.id == self.objective), None):
95 log.warning('Objective referenced in adversary %s but not found: %s. Setting default objective.',
96 self.adversary_id, self.objective)
97 self.objective = DEFAULT_OBJECTIVE_ID
98
99 self.has_repeatable_abilities = self.check_repeatable_abilities(abilities)
100
101 def has_ability(self, ability):
102 for a in self.atomic_ordering:
103 if ability == a:
104 return True
105 return False
106
107 async def which_plugin(self):
108 return self.plugin
109
110 def check_repeatable_abilities(self, ability_list):
111 return any(ab.repeatable for ab_id in self.atomic_ordering for ab in ability_list if ab.ability_id == ab_id)

Calls 1

AdversarySchemaClass · 0.85