(self)
| 159 | del self._items[href] |
| 160 | |
| 161 | def _write(self): |
| 162 | if self._last_etag is not None and \ |
| 163 | self._last_etag != get_etag_from_file(self.path): |
| 164 | raise exceptions.PreconditionFailed(( |
| 165 | 'Some other program modified the file {!r}. Re-run the ' |
| 166 | 'synchronization and make sure absolutely no other program is ' |
| 167 | 'writing into the same file.' |
| 168 | ).format(self.path)) |
| 169 | text = join_collection( |
| 170 | item.raw for item, etag in self._items.values() |
| 171 | ) |
| 172 | try: |
| 173 | with atomic_write(self.path, mode='wb', overwrite=True) as f: |
| 174 | f.write(text.encode(self.encoding)) |
| 175 | finally: |
| 176 | self._items = None |
| 177 | self._last_etag = None |
| 178 | |
| 179 | @contextlib.contextmanager |
| 180 | def at_once(self): |
no test coverage detected