(self)
| 191 | ) |
| 192 | |
| 193 | async def test_copy_from_query_to_path_like(self): |
| 194 | with tempfile.NamedTemporaryFile() as f: |
| 195 | f.close() |
| 196 | |
| 197 | class Path: |
| 198 | def __init__(self, path): |
| 199 | self.path = path |
| 200 | |
| 201 | def __fspath__(self): |
| 202 | return self.path |
| 203 | |
| 204 | await self.con.copy_from_query(''' |
| 205 | SELECT |
| 206 | i, i * 10 |
| 207 | FROM |
| 208 | generate_series(1, 5) AS i |
| 209 | WHERE |
| 210 | i = $1 |
| 211 | ''', 3, output=Path(f.name)) |
| 212 | |
| 213 | with open(f.name, 'rb') as fr: |
| 214 | output = fr.read().decode().split('\n') |
| 215 | self.assertEqual( |
| 216 | output, |
| 217 | [ |
| 218 | '3\t30', |
| 219 | '' |
| 220 | ] |
| 221 | ) |
| 222 | |
| 223 | async def test_copy_from_query_to_bad_output(self): |
| 224 | with self.assertRaisesRegex(TypeError, 'output is expected to be'): |
nothing calls this directly
no test coverage detected