| 535 | self._connect_kwargs = connect_kwargs |
| 536 | |
| 537 | async def _get_new_connection(self): |
| 538 | con = await self._connect( |
| 539 | *self._connect_args, |
| 540 | loop=self._loop, |
| 541 | connection_class=self._connection_class, |
| 542 | record_class=self._record_class, |
| 543 | **self._connect_kwargs, |
| 544 | ) |
| 545 | if not isinstance(con, self._connection_class): |
| 546 | good = self._connection_class |
| 547 | good_n = f'{good.__module__}.{good.__name__}' |
| 548 | bad = type(con) |
| 549 | if bad.__module__ == "builtins": |
| 550 | bad_n = bad.__name__ |
| 551 | else: |
| 552 | bad_n = f'{bad.__module__}.{bad.__name__}' |
| 553 | raise exceptions.InterfaceError( |
| 554 | "expected pool connect callback to return an instance of " |
| 555 | f"'{good_n}', got " f"'{bad_n}'" |
| 556 | ) |
| 557 | |
| 558 | if self._init is not None: |
| 559 | try: |
| 560 | await self._init(con) |
| 561 | except (Exception, asyncio.CancelledError) as ex: |
| 562 | # If a user-defined `init` function fails, we don't |
| 563 | # know if the connection is safe for re-use, hence |
| 564 | # we close it. A new connection will be created |
| 565 | # when `acquire` is called again. |
| 566 | try: |
| 567 | # Use `close()` to close the connection gracefully. |
| 568 | # An exception in `init` isn't necessarily caused |
| 569 | # by an IO or a protocol error. close() will |
| 570 | # do the necessary cleanup via _release_on_close(). |
| 571 | await con.close() |
| 572 | finally: |
| 573 | raise ex |
| 574 | |
| 575 | return con |
| 576 | |
| 577 | async def execute( |
| 578 | self, |