(self)
| 169 | ) |
| 170 | |
| 171 | async def test_copy_from_query_to_path(self): |
| 172 | with tempfile.NamedTemporaryFile() as f: |
| 173 | f.close() |
| 174 | await self.con.copy_from_query(''' |
| 175 | SELECT |
| 176 | i, i * 10 |
| 177 | FROM |
| 178 | generate_series(1, 5) AS i |
| 179 | WHERE |
| 180 | i = $1 |
| 181 | ''', 3, output=f.name) |
| 182 | |
| 183 | with open(f.name, 'rb') as fr: |
| 184 | output = fr.read().decode().split('\n') |
| 185 | self.assertEqual( |
| 186 | output, |
| 187 | [ |
| 188 | '3\t30', |
| 189 | '' |
| 190 | ] |
| 191 | ) |
| 192 | |
| 193 | async def test_copy_from_query_to_path_like(self): |
| 194 | with tempfile.NamedTemporaryFile() as f: |
nothing calls this directly
no test coverage detected