| 225 | return link.id |
| 226 | |
| 227 | async def close(self, services): |
| 228 | await self._cleanup_operation(services) |
| 229 | await self._save_new_source(services) |
| 230 | await services.get('event_svc').fire_event( |
| 231 | exchange=Operation.EVENT_EXCHANGE, |
| 232 | queue=Operation.EVENT_QUEUE_COMPLETED, |
| 233 | op=self.id |
| 234 | ) |
| 235 | |
| 236 | if self.state not in [self.states['FINISHED'], self.states['OUT_OF_TIME']]: |
| 237 | self.state = self.states['FINISHED'] |
| 238 | self.finish = self.get_current_timestamp() |
| 239 | |
| 240 | async def wait_for_completion(self): |
| 241 | for member in self.agents: |