(self, doc, name, collection=None)
| 645 | return self.gfs.get(file_ids[0]).read() |
| 646 | |
| 647 | def delete_attachment(self, doc, name, collection=None): |
| 648 | attachments = doc.get("_attachments", []) |
| 649 | file_id = None |
| 650 | for i, a in enumerate(attachments): |
| 651 | if a[0] == name: |
| 652 | file_id = a[1] |
| 653 | break |
| 654 | if file_id is None: |
| 655 | raise OperationFailure("Attachment not found: %s" % name) |
| 656 | del attachments[i] |
| 657 | self.update(doc, {"_attachments": attachments}, collection=collection) |
| 658 | self.gfs.delete(file_id) |
| 659 | |
| 660 | |
| 661 | class MongoTrials(Trials): |
nothing calls this directly
no test coverage detected