(self, inner_path, peers, published, limit, diffs={}, event_done=None, cb_progress=None)
| 501 | |
| 502 | # Publish worker |
| 503 | def publisher(self, inner_path, peers, published, limit, diffs={}, event_done=None, cb_progress=None): |
| 504 | file_size = self.storage.getSize(inner_path) |
| 505 | content_json_modified = self.content_manager.contents[inner_path]["modified"] |
| 506 | body = self.storage.read(inner_path) |
| 507 | |
| 508 | while 1: |
| 509 | if not peers or len(published) >= limit: |
| 510 | if event_done: |
| 511 | event_done.set(True) |
| 512 | break # All peers done, or published engouht |
| 513 | peer = peers.pop() |
| 514 | if peer in published: |
| 515 | continue |
| 516 | if peer.last_content_json_update == content_json_modified: |
| 517 | self.log.debug("%s already received this update for %s, skipping" % (peer, inner_path)) |
| 518 | continue |
| 519 | |
| 520 | if peer.connection and peer.connection.last_ping_delay: # Peer connected |
| 521 | # Timeout: 5sec + size in kb + last_ping |
| 522 | timeout = 5 + int(file_size / 1024) + peer.connection.last_ping_delay |
| 523 | else: # Peer not connected |
| 524 | # Timeout: 10sec + size in kb |
| 525 | timeout = 10 + int(file_size / 1024) |
| 526 | result = {"exception": "Timeout"} |
| 527 | |
| 528 | for retry in range(2): |
| 529 | try: |
| 530 | with gevent.Timeout(timeout, False): |
| 531 | result = peer.publish(self.address, inner_path, body, content_json_modified, diffs) |
| 532 | if result: |
| 533 | break |
| 534 | except Exception as err: |
| 535 | self.log.error("Publish error: %s" % Debug.formatException(err)) |
| 536 | result = {"exception": Debug.formatException(err)} |
| 537 | |
| 538 | if result and "ok" in result: |
| 539 | published.append(peer) |
| 540 | if cb_progress and len(published) <= limit: |
| 541 | cb_progress(len(published), limit) |
| 542 | self.log.info("[OK] %s: %s %s/%s" % (peer.key, result["ok"], len(published), limit)) |
| 543 | else: |
| 544 | if result == {"exception": "Timeout"}: |
| 545 | peer.onConnectionError("Publish timeout") |
| 546 | self.log.info("[FAILED] %s: %s" % (peer.key, result)) |
| 547 | time.sleep(0.01) |
| 548 | |
| 549 | # Update content.json on peers |
| 550 | @util.Noparallel() |
nothing calls this directly
no test coverage detected