()
| 118 | reason="Requires a postgres server. Sqlite does not support multiple schemas." |
| 119 | ) |
| 120 | def test_empty_other_schema(): |
| 121 | from sqlalchemy import DDL, Column, Integer, MetaData, Table, create_engine, event |
| 122 | |
| 123 | # Database configurations. |
| 124 | pg_host = "localhost" |
| 125 | pg_port = "5432" |
| 126 | pg_user = "user" |
| 127 | pg_pass = "pass" |
| 128 | pg_db = "db" |
| 129 | db_url = f"postgresql://{pg_user}:{pg_pass}@{pg_host}:{pg_port}/{pg_db}" |
| 130 | |
| 131 | # Create an empty table in a different schema. |
| 132 | table_name = "empty_table" |
| 133 | schema_name = "other_schema" |
| 134 | engine = create_engine(db_url) |
| 135 | metadata = MetaData() |
| 136 | table = Table( |
| 137 | table_name, |
| 138 | metadata, |
| 139 | Column("id", Integer, primary_key=True), |
| 140 | Column("col2", Integer), |
| 141 | schema=schema_name, |
| 142 | ) |
| 143 | # Create the schema and the table. |
| 144 | event.listen( |
| 145 | metadata, "before_create", DDL(f"CREATE SCHEMA IF NOT EXISTS {schema_name}") |
| 146 | ) |
| 147 | metadata.create_all(engine) |
| 148 | |
| 149 | # Read the empty table from the other schema. |
| 150 | dask_df = read_sql_table( |
| 151 | table.name, db_url, index_col="id", schema=table.schema, npartitions=1 |
| 152 | ) |
| 153 | |
| 154 | # Validate that the retrieved table is empty. |
| 155 | assert dask_df.index.name == "id" |
| 156 | assert dask_df.col2.dtype == np.dtype("int64") |
| 157 | pd_dataframe = dask_df.compute() |
| 158 | assert pd_dataframe.empty is True |
| 159 | |
| 160 | # Drop the schema and the table. |
| 161 | engine.execute(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE") |
| 162 | |
| 163 | |
| 164 | def test_needs_rational(db): |
nothing calls this directly
no test coverage detected
searching dependent graphs…