| 12 | |
| 13 | |
| 14 | class ConsumerCreator: |
| 15 | connection: pika.SelectConnection |
| 16 | channel: Channel or None |
| 17 | events_consumer: EventsConsumer |
| 18 | jobs_consumer: JobsConsumer |
| 19 | actions_consumer: ActionsConsumer |
| 20 | scheduler_consumer: SchedulerConsumer |
| 21 | stopping: bool |
| 22 | |
| 23 | def __init__(self): |
| 24 | self.channel = None |
| 25 | self.stopping = False |
| 26 | self.create_consumers() |
| 27 | |
| 28 | def on_channel_open(self, channel): |
| 29 | logger.info('Connection opened') |
| 30 | self.channel = channel |
| 31 | self.setup_consumers() |
| 32 | |
| 33 | def on_channel_closed(self, channel, reason): |
| 34 | """Invoked by pika when RabbitMQ unexpectedly closes the channel. |
| 35 | Channels are usually closed if you attempt to do something that |
| 36 | violates the protocol, such as re-declare an exchange or queue with |
| 37 | different parameters. In this case, we'll close the connection |
| 38 | to shutdown the object. |
| 39 | :param pika.channel.Channel channel: The closed channel |
| 40 | :param Exception reason: why the channel was closed |
| 41 | """ |
| 42 | logger.warning('Channel %i was closed: %s', channel, reason) |
| 43 | self.channel = None |
| 44 | if not self.stopping: |
| 45 | self.connection.close() |
| 46 | |
| 47 | def add_on_channel_close_callback(self): |
| 48 | """This method tells pika to call the on_channel_closed method if |
| 49 | RabbitMQ unexpectedly closes the channel. |
| 50 | """ |
| 51 | logger.info('Adding channel close callback') |
| 52 | self.channel.add_on_close_callback(self.on_channel_closed) |
| 53 | |
| 54 | def setup_consumers(self): |
| 55 | self.events_consumer = EventsConsumer(channel = self.channel, connection = self.connection) |
| 56 | self.actions_consumer = ActionsConsumer(channel = self.channel, connection = self.connection) |
| 57 | self.jobs_consumer = JobsConsumer(channel = self.channel, connection = self.connection) |
| 58 | self.scheduler_consumer = SchedulerConsumer(channel = self.channel, connection = self.connection) |
| 59 | |
| 60 | def open_channel(self): |
| 61 | |
| 62 | self.connection.channel(on_open_callback = self.on_channel_open) |
| 63 | |
| 64 | def on_connection_open(self, _unused_connection): |
| 65 | |
| 66 | self.open_channel() |
| 67 | |
| 68 | def on_connection_open_error(self, _unused_connection, err): |
| 69 | logger.error('Connection open failed, reopening in 5 seconds: %s', err) |
| 70 | self.connection.ioloop.call_later(5, self._connection.ioloop.stop) |
| 71 | |