| 13 | |
| 14 | @PluginManager.acceptPlugins |
| 15 | class WorkerManager(object): |
| 16 | |
| 17 | def __init__(self, site): |
| 18 | self.site = site |
| 19 | self.workers = {} # Key: ip:port, Value: Worker.Worker |
| 20 | self.tasks = [] |
| 21 | # {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "optional_hash_id": None, |
| 22 | # "time_started": None, "time_added": time.time(), "peers": peers, "priority": 0, "failed": peer_ids} |
| 23 | self.started_task_num = 0 # Last added task num |
| 24 | self.asked_peers = [] |
| 25 | self.running = True |
| 26 | self.time_task_added = 0 |
| 27 | self.log = logging.getLogger("WorkerManager:%s" % self.site.address_short) |
| 28 | self.process_taskchecker = gevent.spawn(self.checkTasks) |
| 29 | |
| 30 | def __str__(self): |
| 31 | return "WorkerManager %s" % self.site.address_short |
| 32 | |
| 33 | def __repr__(self): |
| 34 | return "<%s>" % self.__str__() |
| 35 | |
| 36 | # Check expired tasks |
| 37 | def checkTasks(self): |
| 38 | while self.running: |
| 39 | tasks = task = worker = workers = None # Cleanup local variables |
| 40 | announced = False |
| 41 | time.sleep(15) # Check every 15 sec |
| 42 | |
| 43 | # Clean up workers |
| 44 | for worker in list(self.workers.values()): |
| 45 | if worker.task and worker.task["done"]: |
| 46 | worker.skip() # Stop workers with task done |
| 47 | |
| 48 | if not self.tasks: |
| 49 | continue |
| 50 | |
| 51 | tasks = self.tasks[:] # Copy it so removing elements wont cause any problem |
| 52 | num_tasks_started = len([task for task in tasks if task["time_started"]]) |
| 53 | |
| 54 | self.log.debug( |
| 55 | "Tasks: %s, started: %s, bad files: %s, total started: %s" % |
| 56 | (len(tasks), num_tasks_started, len(self.site.bad_files), self.started_task_num) |
| 57 | ) |
| 58 | |
| 59 | for task in tasks: |
| 60 | if task["time_started"] and time.time() >= task["time_started"] + 60: |
| 61 | self.log.debug("Timeout, Skipping: %s" % task) # Task taking too long time, skip it |
| 62 | # Skip to next file workers |
| 63 | workers = self.findWorkers(task) |
| 64 | if workers: |
| 65 | for worker in workers: |
| 66 | worker.skip() |
| 67 | else: |
| 68 | self.failTask(task) |
| 69 | |
| 70 | elif time.time() >= task["time_added"] + 60 and not self.workers: # No workers left |
| 71 | self.log.debug("Timeout, Cleanup task: %s" % task) |
| 72 | # Remove task |