(connection)
| 1155 | be a replica/standby server. |
| 1156 | """ |
| 1157 | async def can_be_used(connection): |
| 1158 | settings = connection.get_settings() |
| 1159 | hot_standby_status = getattr(settings, 'in_hot_standby', None) |
| 1160 | if hot_standby_status is not None: |
| 1161 | is_in_hot_standby = hot_standby_status == 'on' |
| 1162 | else: |
| 1163 | is_in_hot_standby = await connection.fetchval( |
| 1164 | "SELECT pg_catalog.pg_is_in_recovery()" |
| 1165 | ) |
| 1166 | return is_in_hot_standby == should_be_in_hot_standby |
| 1167 | |
| 1168 | return can_be_used |
| 1169 |
nothing calls this directly
no test coverage detected
searching dependent graphs…