MCPcopy
hub / github.com/dask/dask / test_empty_other_schema

Function test_empty_other_schema

dask/dataframe/io/tests/test_sql.py:120–161  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

118 reason="Requires a postgres server. Sqlite does not support multiple schemas."
119)
120def test_empty_other_schema():
121 from sqlalchemy import DDL, Column, Integer, MetaData, Table, create_engine, event
122
123 # Database configurations.
124 pg_host = "localhost"
125 pg_port = "5432"
126 pg_user = "user"
127 pg_pass = "pass"
128 pg_db = "db"
129 db_url = f"postgresql://{pg_user}:{pg_pass}@{pg_host}:{pg_port}/{pg_db}"
130
131 # Create an empty table in a different schema.
132 table_name = "empty_table"
133 schema_name = "other_schema"
134 engine = create_engine(db_url)
135 metadata = MetaData()
136 table = Table(
137 table_name,
138 metadata,
139 Column("id", Integer, primary_key=True),
140 Column("col2", Integer),
141 schema=schema_name,
142 )
143 # Create the schema and the table.
144 event.listen(
145 metadata, "before_create", DDL(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")
146 )
147 metadata.create_all(engine)
148
149 # Read the empty table from the other schema.
150 dask_df = read_sql_table(
151 table.name, db_url, index_col="id", schema=table.schema, npartitions=1
152 )
153
154 # Validate that the retrieved table is empty.
155 assert dask_df.index.name == "id"
156 assert dask_df.col2.dtype == np.dtype("int64")
157 pd_dataframe = dask_df.compute()
158 assert pd_dataframe.empty is True
159
160 # Drop the schema and the table.
161 engine.execute(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")
162
163
164def test_needs_rational(db):

Callers

nothing calls this directly

Calls 3

read_sql_tableFunction · 0.90
dtypeMethod · 0.45
computeMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…