MCPcopy Index your code
hub / github.com/diffgram/diffgram / ConsumerCreator

Class ConsumerCreator

eventhandlers/ConsumersCreator.py:14–158  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

12
13
14class ConsumerCreator:
15 connection: pika.SelectConnection
16 channel: Channel or None
17 events_consumer: EventsConsumer
18 jobs_consumer: JobsConsumer
19 actions_consumer: ActionsConsumer
20 scheduler_consumer: SchedulerConsumer
21 stopping: bool
22
23 def __init__(self):
24 self.channel = None
25 self.stopping = False
26 self.create_consumers()
27
28 def on_channel_open(self, channel):
29 logger.info('Connection opened')
30 self.channel = channel
31 self.setup_consumers()
32
33 def on_channel_closed(self, channel, reason):
34 """Invoked by pika when RabbitMQ unexpectedly closes the channel.
35 Channels are usually closed if you attempt to do something that
36 violates the protocol, such as re-declare an exchange or queue with
37 different parameters. In this case, we'll close the connection
38 to shutdown the object.
39 :param pika.channel.Channel channel: The closed channel
40 :param Exception reason: why the channel was closed
41 """
42 logger.warning('Channel %i was closed: %s', channel, reason)
43 self.channel = None
44 if not self.stopping:
45 self.connection.close()
46
47 def add_on_channel_close_callback(self):
48 """This method tells pika to call the on_channel_closed method if
49 RabbitMQ unexpectedly closes the channel.
50 """
51 logger.info('Adding channel close callback')
52 self.channel.add_on_close_callback(self.on_channel_closed)
53
54 def setup_consumers(self):
55 self.events_consumer = EventsConsumer(channel = self.channel, connection = self.connection)
56 self.actions_consumer = ActionsConsumer(channel = self.channel, connection = self.connection)
57 self.jobs_consumer = JobsConsumer(channel = self.channel, connection = self.connection)
58 self.scheduler_consumer = SchedulerConsumer(channel = self.channel, connection = self.connection)
59
60 def open_channel(self):
61
62 self.connection.channel(on_open_callback = self.on_channel_open)
63
64 def on_connection_open(self, _unused_connection):
65
66 self.open_channel()
67
68 def on_connection_open_error(self, _unused_connection, err):
69 logger.error('Connection open failed, reopening in 5 seconds: %s', err)
70 self.connection.ioloop.call_later(5, self._connection.ioloop.stop)
71

Callers 1

main.pyFile · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected