Tests that unscheduling a watch from within an event handler correctly correctly unregisters emitter and handler without deadlocking.
(observer)
| 64 | |
| 65 | |
| 66 | def test_unschedule_self(observer): |
| 67 | """ |
| 68 | Tests that unscheduling a watch from within an event handler correctly |
| 69 | correctly unregisters emitter and handler without deadlocking. |
| 70 | """ |
| 71 | |
| 72 | class EventHandler(FileSystemEventHandler): |
| 73 | def on_modified(self, event): |
| 74 | observer.unschedule(watch) |
| 75 | unschedule_finished.set() |
| 76 | |
| 77 | unschedule_finished = threading.Event() |
| 78 | watch = observer.schedule(EventHandler(), "") |
| 79 | observer.start() |
| 80 | |
| 81 | (emitter,) = observer.emitters |
| 82 | emitter.queue_event(FileModifiedEvent("")) |
| 83 | |
| 84 | assert unschedule_finished.wait() |
| 85 | assert len(observer.emitters) == 0 |
| 86 | |
| 87 | |
| 88 | def test_schedule_after_unschedule_all(observer): |
nothing calls this directly
no test coverage detected
searching dependent graphs…