MCPcopy Index your code
hub / github.com/apache/caldera / Operation

Class Operation

app/objects/c_operation.py:90–583  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

88
89
90class Operation(FirstClassObjectInterface, BaseObject):
91 EVENT_EXCHANGE = 'operation'
92 EVENT_QUEUE_STATE_CHANGED = 'state_changed'
93 EVENT_QUEUE_COMPLETED = 'completed'
94
95 schema = OperationSchema()
96
97 @property
98 def unique(self):
99 return self.hash('%s' % self.id)
100
101 @property
102 def states(self):
103 return {state.name: state.value for state in self.States}
104
105 @classmethod
106 def get_states(cls):
107 return [state.value for state in cls.States]
108
109 @classmethod
110 def get_finished_states(cls):
111 return [cls.States.OUT_OF_TIME.value, cls.States.FINISHED.value, cls.States.CLEANUP.value]
112
113 @property
114 def state(self):
115 return self._state
116
117 @state.setter
118 def state(self, value):
119 previous_state = getattr(self, '_state', NO_PREVIOUS_STATE)
120
121 self._state = value
122
123 if previous_state is NO_PREVIOUS_STATE:
124 return
125
126 if previous_state == value:
127 return
128
129 self._emit_state_change_event(
130 from_state=previous_state,
131 to_state=value
132 )
133
134 def __init__(self, name, adversary=None, agents=None, id='', jitter='2/8', source=None, planner=None,
135 state='running', autonomous=True, obfuscator='plain-text', group=None, auto_close=True, visibility=50,
136 access=None, use_learning_parsers=True):
137 super().__init__()
138 self.id = str(id) if id else str(uuid.uuid4())
139 self.start, self.finish = None, None
140 self.base_timeout = 180
141 self.link_timeout = 30
142 self.name = name
143 self.group = group
144 self.agents = agents if agents else []
145 self.untrusted_agents = set()
146 self.adversary = adversary
147 self.jitter = jitter

Calls 1

OperationSchemaClass · 0.85