MCPcopy Index your code
hub / github.com/HelloZeroNet/ZeroNet / checkTasks

Method checkTasks

src/Worker/WorkerManager.py:37–114  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

35
36 # Check expired tasks
37 def checkTasks(self):
38 while self.running:
39 tasks = task = worker = workers = None # Cleanup local variables
40 announced = False
41 time.sleep(15) # Check every 15 sec
42
43 # Clean up workers
44 for worker in list(self.workers.values()):
45 if worker.task and worker.task["done"]:
46 worker.skip() # Stop workers with task done
47
48 if not self.tasks:
49 continue
50
51 tasks = self.tasks[:] # Copy it so removing elements wont cause any problem
52 num_tasks_started = len([task for task in tasks if task["time_started"]])
53
54 self.log.debug(
55 "Tasks: %s, started: %s, bad files: %s, total started: %s" %
56 (len(tasks), num_tasks_started, len(self.site.bad_files), self.started_task_num)
57 )
58
59 for task in tasks:
60 if task["time_started"] and time.time() >= task["time_started"] + 60:
61 self.log.debug("Timeout, Skipping: %s" % task) # Task taking too long time, skip it
62 # Skip to next file workers
63 workers = self.findWorkers(task)
64 if workers:
65 for worker in workers:
66 worker.skip()
67 else:
68 self.failTask(task)
69
70 elif time.time() >= task["time_added"] + 60 and not self.workers: # No workers left
71 self.log.debug("Timeout, Cleanup task: %s" % task)
72 # Remove task
73 self.failTask(task)
74
75 elif (task["time_started"] and time.time() >= task["time_started"] + 15) or not self.workers:
76 # Find more workers: Task started more than 15 sec ago or no workers
77 workers = self.findWorkers(task)
78 self.log.debug(
79 "Slow task: %s, (workers: %s, optional_hash_id: %s, peers: %s, failed: %s, asked: %s)" %
80 (
81 task["inner_path"], len(workers), task["optional_hash_id"],
82 len(task["peers"] or []), len(task["failed"]), len(self.asked_peers)
83 )
84 )
85 if not announced:
86 task["site"].announce(mode="more") # Find more peers
87 announced = True
88 if task["optional_hash_id"]:
89 if self.workers:
90 if not task["time_started"]:
91 ask_limit = 20
92 else:
93 ask_limit = max(10, time.time() - task["time_started"])
94 if len(self.asked_peers) < ask_limit and len(task["peers"] or []) <= len(task["failed"]) * 2:

Callers

nothing calls this directly

Calls 8

findWorkersMethod · 0.95
failTaskMethod · 0.95
startFindOptionalMethod · 0.95
startWorkersMethod · 0.95
getMaxWorkersMethod · 0.95
valuesMethod · 0.80
skipMethod · 0.80
announceMethod · 0.45

Tested by

no test coverage detected