MCPcopy
hub / github.com/MagicStack/asyncpg / test_pool_set_connection_args

Method test_pool_set_connection_args

tests/test_pool.py:910–951  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

908 await pool.close()
909
910 async def test_pool_set_connection_args(self):
911 pool = await self.create_pool(database='postgres',
912 min_size=1, max_size=1)
913
914 # Test that connection is expired on release.
915 con = await pool.acquire()
916 connspec = self.get_connection_spec()
917 try:
918 connspec['server_settings']['application_name'] = \
919 'set_conn_args_test'
920 except KeyError:
921 connspec['server_settings'] = {
922 'application_name': 'set_conn_args_test'
923 }
924
925 pool.set_connect_args(**connspec)
926 await pool.expire_connections()
927 await pool.release(con)
928
929 con = await pool.acquire()
930 self.assertEqual(con.get_settings().application_name,
931 'set_conn_args_test')
932 await pool.release(con)
933
934 # Test that connection is expired before acquire.
935 connspec = self.get_connection_spec()
936 try:
937 connspec['server_settings']['application_name'] = \
938 'set_conn_args_test'
939 except KeyError:
940 connspec['server_settings'] = {
941 'application_name': 'set_conn_args_test_2'
942 }
943
944 pool.set_connect_args(**connspec)
945 await pool.expire_connections()
946
947 con = await pool.acquire()
948 self.assertEqual(con.get_settings().application_name,
949 'set_conn_args_test_2')
950 await pool.release(con)
951 await pool.close()
952
953 async def test_pool_init_race(self):
954 pool = self.create_pool(database='postgres', min_size=1, max_size=1)

Callers

nothing calls this directly

Calls 8

set_connect_argsMethod · 0.80
expire_connectionsMethod · 0.80
get_settingsMethod · 0.80
create_poolMethod · 0.45
acquireMethod · 0.45
get_connection_specMethod · 0.45
releaseMethod · 0.45
closeMethod · 0.45

Tested by

no test coverage detected