(
self,
update_dict: Dict,
**kwargs,
)
| 113 | return record |
| 114 | |
| 115 | async def update( |
| 116 | self, |
| 117 | update_dict: Dict, |
| 118 | **kwargs, |
| 119 | ) -> ModelEntity: |
| 120 | # handle kwargs |
| 121 | self._check_kwargs(object_id_required=None, **kwargs) |
| 122 | collection_id = kwargs["collection_id"] |
| 123 | record_id = kwargs["record_id"] |
| 124 | new_metadata = update_dict.get("metadata") |
| 125 | |
| 126 | collection = await collection_ops.get(collection_id=collection_id) |
| 127 | record: Record = await self.get(collection_id=collection_id, record_id=record_id) |
| 128 | |
| 129 | if record.type == RecordType.FILE: |
| 130 | raise_request_validation_error("Cannot update a file record. Please delete and create a new record.") |
| 131 | |
| 132 | chunk_text_list, num_tokens_list, embeddings, db_content = None, None, None, None |
| 133 | new_type, new_title = None, None |
| 134 | if ( |
| 135 | (update_dict.get("type") is not None) |
| 136 | or (update_dict.get("content") is not None) |
| 137 | or (update_dict.get("title") is not None) |
| 138 | ): |
| 139 | new_type = RecordType(update_dict.get("type", record.type)) |
| 140 | new_title = update_dict.get("title", record.title) |
| 141 | new_content = update_dict.get("content", record.content) if new_type == RecordType.TEXT else None |
| 142 | text_splitter = TextSplitter(**update_dict["text_splitter"]) |
| 143 | |
| 144 | # split content into chunks |
| 145 | chunk_text_list, num_tokens_list, embeddings, db_content = await process_content( |
| 146 | collection=collection, |
| 147 | type=new_type, |
| 148 | title=new_title, |
| 149 | content=new_content, |
| 150 | file_id=update_dict.get("file_id"), |
| 151 | url=update_dict.get("url"), |
| 152 | text_splitter=text_splitter, |
| 153 | max_num_chunks=record.num_chunks + collection.rest_capacity(), |
| 154 | ) |
| 155 | |
| 156 | # update record |
| 157 | await db_record.update_record( |
| 158 | collection=collection, |
| 159 | record=record, |
| 160 | title=new_title, |
| 161 | type=new_type, |
| 162 | content=db_content, |
| 163 | chunk_text_list=chunk_text_list, |
| 164 | chunk_num_tokens_list=num_tokens_list, |
| 165 | chunk_embedding_list=embeddings, |
| 166 | metadata=new_metadata, |
| 167 | ) |
| 168 | |
| 169 | # get the updated record |
| 170 | record = await self.get(collection_id=collection_id, record_id=record_id) |
| 171 | return record |
| 172 |
nothing calls this directly
no test coverage detected