Establish a connection with the event pull socket Default timeout is 1 s
(self, timeout=1)
| 476 | self.cpub = False |
| 477 | |
| 478 | def connect_pull(self, timeout=1): |
| 479 | """ |
| 480 | Establish a connection with the event pull socket |
| 481 | Default timeout is 1 s |
| 482 | """ |
| 483 | if self.cpush: |
| 484 | return True |
| 485 | |
| 486 | if self._run_io_loop_sync: |
| 487 | if self.pusher is None: |
| 488 | self.pusher = salt.utils.asynchronous.SyncWrapper( |
| 489 | salt.transport.ipc_publish_server, |
| 490 | args=( |
| 491 | self.node, |
| 492 | self.opts, |
| 493 | ), |
| 494 | ) |
| 495 | try: |
| 496 | self.pusher.connect(timeout=timeout) |
| 497 | self.cpush = True |
| 498 | except tornado.iostream.StreamClosedError as exc: |
| 499 | log.debug("Unable to connect pusher: %s", exc) |
| 500 | except Exception as exc: # pylint: disable=broad-except |
| 501 | log.error( |
| 502 | "Unable to connect pusher: %s", |
| 503 | exc, |
| 504 | exc_info_on_loglevel=logging.DEBUG, |
| 505 | ) |
| 506 | else: |
| 507 | if self.pusher is None: |
| 508 | self.pusher = salt.transport.ipc_publish_server( |
| 509 | self.node, |
| 510 | self.opts, |
| 511 | ) |
| 512 | # For the asynchronous case, the connect will be deferred to when |
| 513 | # fire_event() is invoked. |
| 514 | self.cpush = True |
| 515 | return self.cpush |
| 516 | |
| 517 | def close_pull(self): |
| 518 | """ |