MCPcopy Index your code
hub / github.com/HelloZeroNet/ZeroNet / WorkerManager

Class WorkerManager

src/Worker/WorkerManager.py:15–567  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

13
14@PluginManager.acceptPlugins
15class WorkerManager(object):
16
17 def __init__(self, site):
18 self.site = site
19 self.workers = {} # Key: ip:port, Value: Worker.Worker
20 self.tasks = []
21 # {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "optional_hash_id": None,
22 # "time_started": None, "time_added": time.time(), "peers": peers, "priority": 0, "failed": peer_ids}
23 self.started_task_num = 0 # Last added task num
24 self.asked_peers = []
25 self.running = True
26 self.time_task_added = 0
27 self.log = logging.getLogger("WorkerManager:%s" % self.site.address_short)
28 self.process_taskchecker = gevent.spawn(self.checkTasks)
29
30 def __str__(self):
31 return "WorkerManager %s" % self.site.address_short
32
33 def __repr__(self):
34 return "<%s>" % self.__str__()
35
36 # Check expired tasks
37 def checkTasks(self):
38 while self.running:
39 tasks = task = worker = workers = None # Cleanup local variables
40 announced = False
41 time.sleep(15) # Check every 15 sec
42
43 # Clean up workers
44 for worker in list(self.workers.values()):
45 if worker.task and worker.task["done"]:
46 worker.skip() # Stop workers with task done
47
48 if not self.tasks:
49 continue
50
51 tasks = self.tasks[:] # Copy it so removing elements wont cause any problem
52 num_tasks_started = len([task for task in tasks if task["time_started"]])
53
54 self.log.debug(
55 "Tasks: %s, started: %s, bad files: %s, total started: %s" %
56 (len(tasks), num_tasks_started, len(self.site.bad_files), self.started_task_num)
57 )
58
59 for task in tasks:
60 if task["time_started"] and time.time() >= task["time_started"] + 60:
61 self.log.debug("Timeout, Skipping: %s" % task) # Task taking too long time, skip it
62 # Skip to next file workers
63 workers = self.findWorkers(task)
64 if workers:
65 for worker in workers:
66 worker.skip()
67 else:
68 self.failTask(task)
69
70 elif time.time() >= task["time_added"] + 60 and not self.workers: # No workers left
71 self.log.debug("Timeout, Cleanup task: %s" % task)
72 # Remove task

Callers 1

__init__Method · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected