MCPcopy
hub / github.com/sartography/SpiffWorkflow / TaskFilter

Class TaskFilter

SpiffWorkflow/util/task.py:128–176  ·  view source on GitHub ↗

This is the default class for filtering during task iteration. Note: All filter values must match. Default filter values match any task.

Source from the content-addressed store, hash-verified

126
127
128class TaskFilter:
129 """This is the default class for filtering during task iteration.
130
131 Note:
132 All filter values must match. Default filter values match any task.
133 """
134
135 def __init__(self, state=TaskState.ANY_MASK, updated_ts=0, manual=None, spec_name=None, spec_class=None):
136 """
137 Parameters:
138 state (`TaskState`): limit results to state or mask
139 updated_ts (float): limit results to tasks updated at or after this timestamp
140 manual (bool): match the value of the `TaskSpec`'s manual attribute
141 spec_name (str): match the value of the `TaskSpec`'s name
142 spec_class (`TaskSpec`): match the value of the `TaskSpec`'s class
143 """
144 self.state = state
145 self.updated_ts = updated_ts
146 self.manual = manual
147 self.spec_name = spec_name
148 self.spec_class = spec_class
149
150 def matches(self, task):
151 """Check if the task matches this filter.
152
153 Args:
154 task (`Task`): the task to check
155
156 Returns:
157 bool: indicates whether the task matches
158 """
159
160 if not task.has_state(self.state):
161 return False
162
163 if not task.last_state_change >= self.updated_ts:
164 return False
165
166 if not (self.manual is None or task.task_spec.manual == self.manual):
167 return False
168
169 if not (self.spec_name is None or task.task_spec.name == self.spec_name):
170 return False
171
172 if not (self.spec_class is None or isinstance(task.task_spec, self.spec_class)):
173 return False
174
175
176 return True
177
178
179class TaskIterator:

Callers 2

run_nextMethod · 0.85
__init__Method · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected