Insert embeddings into MongoDB. Args: embeddings (List[Dict[str, Any]]): List of embedding records to insert. batch_size (int): Number of records to insert in each batch. Returns: int: Number of embeddings successfully inserted.
(self, embeddings: List[Dict[str, Any]], batch_size: int = 100)
| 172 | return bytes(doc["index_data"]) |
| 173 | |
| 174 | def insert_embeddings(self, embeddings: List[Dict[str, Any]], batch_size: int = 100) -> int: |
| 175 | """ |
| 176 | Insert embeddings into MongoDB. |
| 177 | Args: |
| 178 | embeddings (List[Dict[str, Any]]): List of embedding records to insert. |
| 179 | batch_size (int): Number of records to insert in each batch. |
| 180 | Returns: |
| 181 | int: Number of embeddings successfully inserted. |
| 182 | """ |
| 183 | if not embeddings: |
| 184 | raise ValueError("No embeddings to insert.") |
| 185 | |
| 186 | docs: List[Dict[str, Any]] = [] |
| 187 | |
| 188 | for e in embeddings: |
| 189 | face = e["face"] |
| 190 | face_shape = list(face.shape) |
| 191 | |
| 192 | binary_face_data = self.Binary(face.astype(np.float32).tobytes()) |
| 193 | |
| 194 | embedding_bytes = struct.pack(f'{len(e["embedding"])}d', *e["embedding"]) |
| 195 | |
| 196 | face_hash = hashlib.sha256(json.dumps(face.tolist()).encode()).hexdigest() |
| 197 | embedding_hash = hashlib.sha256(embedding_bytes).hexdigest() |
| 198 | |
| 199 | int_id = self.counters.find_one_and_update( |
| 200 | {"_id": "embedding_id"}, {"$inc": {"seq": 1}}, upsert=True, return_document=True |
| 201 | )["seq"] |
| 202 | |
| 203 | docs.append( |
| 204 | { |
| 205 | "sequence": int_id, |
| 206 | "img_name": e["img_name"], |
| 207 | "face": binary_face_data, |
| 208 | "face_shape": face_shape, |
| 209 | "model_name": e["model_name"], |
| 210 | "detector_backend": e["detector_backend"], |
| 211 | "aligned": e["aligned"], |
| 212 | "l2_normalized": e["l2_normalized"], |
| 213 | "embedding": e["embedding"], |
| 214 | "face_hash": face_hash, |
| 215 | "embedding_hash": embedding_hash, |
| 216 | "created_at": datetime.now(timezone.utc), |
| 217 | } |
| 218 | ) |
| 219 | |
| 220 | inserted = 0 |
| 221 | try: |
| 222 | for i in range(0, len(docs), batch_size): |
| 223 | result = self.embeddings.insert_many(docs[i : i + batch_size], ordered=False) |
| 224 | inserted += len(result.inserted_ids) |
| 225 | except (self.DuplicateKeyError, self.BulkWriteError) as e: |
| 226 | if len(docs) == 1: |
| 227 | logger.warn("Duplicate detected for extracted face and embedding.") |
| 228 | return inserted |
| 229 | raise DuplicateEntryError( |
| 230 | f"Duplicate detected for extracted face and embedding in {i}-th batch" |
| 231 | ) from e |
nothing calls this directly
no test coverage detected