MCPcopy
hub / github.com/sartography/SpiffWorkflow / catch

Method catch

SpiffWorkflow/bpmn/workflow.py:107–132  ·  view source on GitHub ↗

Tasks can always catch events, regardless of their state. The event information is stored in the task's internal data and processed when the task is reached in the workflow. If a task should only receive messages while it is running (eg a boundary event), the task should c

(self, event)

Source from the content-addressed store, hash-verified

105 return [sp for sp in self.subprocesses.values() if not sp.completed]
106
107 def catch(self, event):
108 """
109 Tasks can always catch events, regardless of their state. The event information is stored in the task's
110 internal data and processed when the task is reached in the workflow. If a task should only receive messages
111 while it is running (eg a boundary event), the task should call the event_definition's reset method before
112 executing to clear out a stale message.
113
114 :param event: the thrown event
115 """
116 if event.target is not None:
117 # This limits results to tasks in the specified workflow
118 tasks = event.target.get_tasks(skip_subprocesses=True, state=TaskState.NOT_FINISHED_MASK, catches_event=event)
119 if isinstance(event.event_definition, CodeEventDefinition) and len(tasks) == 0:
120 event.target = event.target.parent_workflow
121 self.catch(event)
122 else:
123 self.update_collaboration(event)
124 tasks = self.get_tasks(state=TaskState.NOT_FINISHED_MASK, catches_event=event)
125 # Figure out if we need to create an external event
126 if len(tasks) == 0:
127 self.bpmn_events.append(event)
128
129 for task in tasks:
130 task.task_spec.catch(task, event)
131 if len(tasks) > 0:
132 self.refresh_waiting_tasks()
133
134 def send_event(self, event):
135 """Allows this workflow to catch an externally generated event."""

Calls 3

update_collaborationMethod · 0.95
refresh_waiting_tasksMethod · 0.95
get_tasksMethod · 0.80