| 96 | |
| 97 | |
| 98 | async def run_test(test, global_variables): |
| 99 | variables = global_variables.copy() |
| 100 | variables.update(test.get('variables', {})) |
| 101 | |
| 102 | # port from env. variable of default to 8812 |
| 103 | port = int(os.getenv('PGPORT', 8812)) |
| 104 | connection = await asyncpg.connect( |
| 105 | host='localhost', |
| 106 | port=port, |
| 107 | user='admin', |
| 108 | password='quest', |
| 109 | database='qdb' |
| 110 | ) |
| 111 | |
| 112 | # hard-code array types. this is a workaround for asyncpg using |
| 113 | # introspection to determine array types. the introspection query uses recursive CTEs which are not supported |
| 114 | # by QuestDB :-( |
| 115 | # See: https://github.com/MagicStack/asyncpg/discussions/1015 |
| 116 | float8_array = { |
| 117 | 'oid': 1022, |
| 118 | 'elemtype': 701, |
| 119 | 'kind': 'b', |
| 120 | 'name': '_float8', |
| 121 | 'elemtype_name': 'float8', |
| 122 | 'ns': 'pg_catalog', |
| 123 | 'elemdelim': ',', |
| 124 | 'depth': 0, |
| 125 | 'range_subtype': None, |
| 126 | 'attrtypoids': None, |
| 127 | 'basetype': None |
| 128 | } |
| 129 | varchar_array = { |
| 130 | 'oid': 1015, |
| 131 | 'elemtype': 1043, |
| 132 | 'kind': 'b', |
| 133 | 'name': '_varchar', |
| 134 | 'elemtype_name': 'varchar', |
| 135 | 'ns': 'pg_catalog', |
| 136 | 'elemdelim': ',', |
| 137 | 'depth': 0, |
| 138 | 'range_subtype': None, |
| 139 | 'attrtypoids': None, |
| 140 | 'basetype': None |
| 141 | } |
| 142 | connection._protocol.get_settings().register_data_types([float8_array, varchar_array]) |
| 143 | |
| 144 | test_failed = False |
| 145 | try: |
| 146 | # Prepare phase |
| 147 | prepare_steps = test.get('prepare', []) |
| 148 | await execute_steps(prepare_steps, variables, connection) |
| 149 | |
| 150 | # Test steps |
| 151 | test_steps = test.get('steps', []) |
| 152 | await execute_steps(test_steps, variables, connection) |
| 153 | |
| 154 | print(f"Test '{test['name']}' passed.") |
| 155 | |