MCPcopy Index your code
hub / github.com/MagicStack/asyncpg / assertLoopErrorHandlerCalled

Method assertLoopErrorHandlerCalled

asyncpg/_testbase/__init__.py:156–177  ·  view source on GitHub ↗
(self, msg_re: str)

Source from the content-addressed store, hash-verified

154
155 @contextlib.contextmanager
156 def assertLoopErrorHandlerCalled(self, msg_re: str):
157 contexts = []
158
159 def handler(loop, ctx):
160 contexts.append(ctx)
161
162 old_handler = self.loop.get_exception_handler()
163 self.loop.set_exception_handler(handler)
164 try:
165 yield
166
167 for ctx in contexts:
168 msg = ctx.get('message')
169 if msg and re.search(msg_re, msg):
170 return
171
172 raise AssertionError(
173 'no message matching {!r} was logged with '
174 'loop.call_exception_handler()'.format(msg_re))
175
176 finally:
177 self.loop.set_exception_handler(old_handler)
178
179 def loop_exception_handler(self, loop, context):
180 self.__unhandled_exceptions.append(context)

Calls 1

getMethod · 0.80