(self)
| 34 | self.scheduling_thread.start() |
| 35 | |
| 36 | def stop(self): |
| 37 | self.stopped = True |
| 38 | |
| 39 | def stopper(): |
| 40 | pass |
| 41 | |
| 42 | # just schedule the next execution to exit thread immediately |
| 43 | self.scheduler.enter(1, 0, stopper) |
| 44 | |
| 45 | self.scheduling_thread.join(1) |
| 46 | |
| 47 | def schedule(self, execute_at_datetime, callback, params): |
| 48 | self.scheduler.enterabs(execute_at_datetime.timestamp(), 1, callback, params) |
no outgoing calls
no test coverage detected