(self, peers=None, force_num=0, reason="Unknown")
| 181 | |
| 182 | # Start workers to process tasks |
| 183 | def startWorkers(self, peers=None, force_num=0, reason="Unknown"): |
| 184 | if not self.tasks: |
| 185 | return False # No task for workers |
| 186 | max_workers = min(self.getMaxWorkers(), len(self.site.peers)) |
| 187 | if len(self.workers) >= max_workers and not peers: |
| 188 | return False # Workers number already maxed and no starting peers defined |
| 189 | self.log.debug( |
| 190 | "Starting workers (%s), tasks: %s, peers: %s, workers: %s" % |
| 191 | (reason, len(self.tasks), len(peers or []), len(self.workers)) |
| 192 | ) |
| 193 | if not peers: |
| 194 | peers = self.site.getConnectedPeers() |
| 195 | if len(peers) < max_workers: |
| 196 | peers += self.site.getRecentPeers(max_workers * 2) |
| 197 | if type(peers) is set: |
| 198 | peers = list(peers) |
| 199 | |
| 200 | |
| 201 | # Sort by ping |
| 202 | peers.sort(key=lambda peer: peer.connection.last_ping_delay if peer.connection and peer.connection.last_ping_delay and len(peer.connection.waiting_requests) == 0 and peer.connection.connected else 9999) |
| 203 | |
| 204 | for peer in peers: # One worker for every peer |
| 205 | if peers and peer not in peers: |
| 206 | continue # If peers defined and peer not valid |
| 207 | |
| 208 | if force_num: |
| 209 | worker = self.addWorker(peer, force=True) |
| 210 | force_num -= 1 |
| 211 | else: |
| 212 | worker = self.addWorker(peer) |
| 213 | |
| 214 | if worker: |
| 215 | self.log.debug("Added worker: %s, workers: %s/%s" % (peer.key, len(self.workers), max_workers)) |
| 216 | |
| 217 | # Find peers for optional hash in local hash tables and add to task peers |
| 218 | def findOptionalTasks(self, optional_tasks, reset_task=False): |
no test coverage detected