(
self,
update_dict: Dict,
**kwargs,
)
| 74 | return chunk |
| 75 | |
| 76 | async def update( |
| 77 | self, |
| 78 | update_dict: Dict, |
| 79 | **kwargs, |
| 80 | ) -> ModelEntity: |
| 81 | from app.services.retrieval.embedding import embed_documents |
| 82 | |
| 83 | # handle kwargs |
| 84 | self._check_kwargs(object_id_required=None, **kwargs) |
| 85 | collection_id = kwargs["collection_id"] |
| 86 | chunk_id = kwargs["chunk_id"] |
| 87 | |
| 88 | request = ChunkUpdateRequest(**update_dict) |
| 89 | content = request.content |
| 90 | metadata = request.metadata |
| 91 | collection = await collection_ops.get(collection_id=collection_id) |
| 92 | chunk = await self.get(collection_id=collection_id, chunk_id=chunk_id) |
| 93 | |
| 94 | num_tokens, embedding = None, None |
| 95 | |
| 96 | if content: |
| 97 | # Get model |
| 98 | embedding_model = await model_ops.get(model_id=collection.embedding_model_id) |
| 99 | |
| 100 | # embed the document |
| 101 | embeddings = await embed_documents( |
| 102 | documents=[content], |
| 103 | embedding_model=embedding_model, |
| 104 | embedding_size=collection.embedding_size, |
| 105 | ) |
| 106 | embedding = embeddings[0] |
| 107 | |
| 108 | # update chunk |
| 109 | num_tokens = default_tokenizer.count_tokens(content) |
| 110 | |
| 111 | # update chunk |
| 112 | await db_chunk.update_chunk( |
| 113 | collection=collection, |
| 114 | chunk=chunk, |
| 115 | content=content, |
| 116 | embedding=embedding, |
| 117 | num_tokens=num_tokens, |
| 118 | metadata=metadata, |
| 119 | ) |
| 120 | |
| 121 | # get the updated chunk |
| 122 | chunk = await chunk_ops.get(collection_id=collection_id, chunk_id=chunk_id) |
| 123 | return chunk |
| 124 | |
| 125 | async def delete(self, **kwargs) -> None: |
| 126 | # handle kwargs |
no test coverage detected