( # noqa: C901
tmp_dir, dvc, tmp_path_factory, local_remote, mocker
)
| 171 | |
| 172 | |
| 173 | def test_partial_push_n_pull( # noqa: C901 |
| 174 | tmp_dir, dvc, tmp_path_factory, local_remote, mocker |
| 175 | ): |
| 176 | from dvc_objects.fs import generic |
| 177 | |
| 178 | foo = tmp_dir.dvc_gen({"foo": "foo content"})[0].outs[0] |
| 179 | bar = tmp_dir.dvc_gen({"bar": "bar content"})[0].outs[0] |
| 180 | baz = tmp_dir.dvc_gen({"baz": {"foo": "foo content"}})[0].outs[0] |
| 181 | |
| 182 | # Faulty upload version, failing on foo |
| 183 | original = generic.transfer |
| 184 | odb = dvc.cloud.get_remote_odb("upstream") |
| 185 | |
| 186 | def unreliable_upload(from_fs, from_info, to_fs, to_info, **kwargs): |
| 187 | on_error = kwargs["on_error"] |
| 188 | assert on_error |
| 189 | if isinstance(from_info, str): |
| 190 | from_info = [from_info] |
| 191 | else: |
| 192 | from_info = list(from_info) |
| 193 | if isinstance(to_info, str): |
| 194 | to_info = [to_info] |
| 195 | else: |
| 196 | to_info = list(to_info) |
| 197 | for i in range(len(from_info) - 1, -1, -1): |
| 198 | from_i = from_info[i] |
| 199 | to_i = to_info[i] |
| 200 | if os.path.abspath(to_i) == os.path.abspath( |
| 201 | odb.get(foo.hash_info.value).path |
| 202 | ): |
| 203 | if on_error: |
| 204 | on_error(from_i, to_i, Exception("stop foo")) |
| 205 | del from_info[i] |
| 206 | del to_info[i] |
| 207 | return original(from_fs, from_info, to_fs, to_info, **kwargs) |
| 208 | |
| 209 | mock_upload = mocker.patch.object(generic, "transfer", unreliable_upload) |
| 210 | with pytest.raises(UploadError) as upload_error_info: |
| 211 | dvc.push() |
| 212 | assert upload_error_info.value.amount == 2 |
| 213 | |
| 214 | assert not odb.exists(foo.hash_info.value) |
| 215 | assert odb.exists(bar.hash_info.value) |
| 216 | assert not odb.exists(baz.hash_info.value) |
| 217 | mocker.stop(mock_upload) |
| 218 | |
| 219 | # Push everything and delete local cache |
| 220 | dvc.push() |
| 221 | dvc.cache.local.clear() |
| 222 | |
| 223 | baz._collect_used_dir_cache() |
| 224 | |
| 225 | def unreliable_download(_from_fs, from_info, _to_fs, to_info, **kwargs): |
| 226 | on_error = kwargs["on_error"] |
| 227 | assert on_error |
| 228 | if isinstance(from_info, str): |
| 229 | from_info = [from_info] |
| 230 | if isinstance(to_info, str): |
nothing calls this directly
no test coverage detected