| 333 | return self |
| 334 | |
| 335 | def __exit__(self, *args): |
| 336 | # Publish a payload to tell the collection it's done processing |
| 337 | attempts = 300 |
| 338 | while attempts > 0: |
| 339 | self.publish({"tgt_type": "glob", "tgt": "*", "jid": -1, "stop": True}) |
| 340 | if self.collector.stop_running.wait(1) is True: |
| 341 | break |
| 342 | time.sleep(0.5) |
| 343 | attempts -= 1 |
| 344 | else: |
| 345 | pytest.fail("Failed to confirm the collector has stopped") |
| 346 | # Now trigger the collector to also exit |
| 347 | self.collector.__exit__(*args) |
| 348 | # We can safely wait here without a timeout because the Collector instance has a |
| 349 | # hard timeout set, so eventually Collector.stopped will be set |
| 350 | self.collector.stopped.wait() |
| 351 | # Stop our own processing |
| 352 | self.queue.put(None) |
| 353 | # Wait at most 10 secs for the above `None` in the queue to be processed |
| 354 | self.stopped.wait(10) |
| 355 | self.close() |
| 356 | self.terminate() |