MCPcopy
hub / github.com/pika/pika / test_on_closeok

Method test_on_closeok

tests/unit/channel_tests.py:1369–1393  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

1367 header_value.properties, body_value)
1368
1369 def test_on_closeok(self):
1370 self.obj._set_state(self.obj.OPEN)
1371 self.obj._cleanup = mock.Mock(wraps=self.obj._cleanup)
1372
1373 # Close from user
1374 self.obj.close(200, 'All is well')
1375 self.assertEqual(self.obj._closing_reason.reply_code, 200)
1376 self.assertEqual(self.obj._closing_reason.reply_text, 'All is well')
1377 self.assertEqual(self.obj._state, self.obj.CLOSING)
1378
1379 self.obj._on_closeok(
1380 frame.Method(self.obj.channel_number, spec.Channel.CloseOk()))
1381
1382 self.assertTrue(self.obj.is_closed,
1383 'Channel was not closed; state=%s' % (self.obj._state,))
1384
1385 self.obj.callbacks.process.assert_any_call(self.obj.channel_number,
1386 '_on_channel_close',
1387 self.obj, self.obj, mock.ANY)
1388 reason = self.obj.callbacks.process.call_args_list[0][0][4]
1389 self.assertIsInstance(reason, exceptions.ChannelClosedByClient)
1390 self.assertEqual((reason.reply_code, reason.reply_text),
1391 (200, 'All is well'))
1392
1393 self.assertEqual(self.obj._cleanup.call_count, 1)
1394
1395 def test_on_closeok_following_close_from_broker(self):
1396 self.obj._set_state(self.obj.OPEN)

Callers

nothing calls this directly

Calls 3

_set_stateMethod · 0.80
_on_closeokMethod · 0.80
closeMethod · 0.45

Tested by

no test coverage detected