MCPcopy
hub / github.com/sartography/SpiffWorkflow / TaskIterator

Class TaskIterator

SpiffWorkflow/util/task.py:179–267  ·  view source on GitHub ↗

Default task iteration class.

Source from the content-addressed store, hash-verified

177
178
179class TaskIterator:
180 """Default task iteration class."""
181
182 def __init__(self, task, end_at_spec=None, max_depth=1000, depth_first=True, task_filter=None, **kwargs):
183 """Iterate over the task tree and return the tasks matching the filter parameters.
184
185 Args:
186 task (`Task`): the task to start from
187
188 Keyword Args:
189 end_at (str):stop when a task spec with this name is reached
190 max_depth (int): stop when this depth is reached
191 depth_first (bool): return results in depth first order
192 task_filter (`TaskFilter`): return only tasks matching this filter
193
194 Notes:
195 Keyword args not used by this class will be passed into `TaskFilter` if no `task_filter` is provided.
196 This is for convenience (filter values can be used directly from `Workflow.get_tasks`) as well as
197 backwards compatilibity for queries about `TaskState`.
198 """
199 self.task_filter = task_filter or TaskFilter(**kwargs)
200 self.end_at_spec = end_at_spec
201 self.max_depth = max_depth
202 self.depth_first = depth_first
203
204 self.task_list = [task]
205 self.depth = 0
206 # Figure out which states need to be traversed.
207 # Predicted tasks can follow definite tasks but not vice versa; definite tasks can follow finished tasks but not vice versa
208 # The dream is for a child task to always have a lower task state than its parent; currently we have parents that wait for
209 # their children
210 if self.task_filter.state & TaskState.PREDICTED_MASK:
211 self.min_state = TaskState.MAYBE
212 elif self.task_filter.state & TaskState.DEFINITE_MASK:
213 self.min_state = TaskState.FUTURE
214 else:
215 self.min_state = TaskState.COMPLETED
216
217 def __iter__(self):
218 return self
219
220 def __next__(self):
221 task = self._next()
222 while not self.task_filter.matches(task):
223 task = self._next()
224 return task
225
226 def _next(self):
227
228 if not self.task_list:
229 raise StopIteration()
230
231 task = self.task_list.pop(0)
232 if task._children and \
233 task.state >= self.min_state and \
234 self.depth < self.max_depth and \
235 task.task_spec.name != self.end_at_spec:
236

Callers 14

_update_hookMethod · 0.90
_branch_may_mergeMethod · 0.90
_branch_is_completeMethod · 0.90
_find_tasksMethod · 0.90
_update_hookMethod · 0.90
testTreeMethod · 0.90
is_completedMethod · 0.85
manual_input_requiredMethod · 0.85
get_tasks_iteratorMethod · 0.85
update_waiting_tasksMethod · 0.85

Calls

no outgoing calls

Tested by 1

testTreeMethod · 0.72