(self, docs: List[Document])
| 227 | return final_docs |
| 228 | |
| 229 | def split_para_sentence(self, docs: List[Document]) -> List[Document]: |
| 230 | chunks = docs |
| 231 | while True: |
| 232 | un_splittables = 0 |
| 233 | split_chunks = [] |
| 234 | for c in chunks: |
| 235 | if c.content.strip() == "": |
| 236 | continue |
| 237 | if self.num_tokens(c.content) <= 1.3 * self.config.chunk_size: |
| 238 | # small chunk: no need to split |
| 239 | split_chunks.append(c) |
| 240 | continue |
| 241 | splits = self._split_para_sentence_once([c]) |
| 242 | un_splittables += len(splits) == 1 |
| 243 | split_chunks += splits |
| 244 | if len(split_chunks) == len(chunks): |
| 245 | if un_splittables > 0: |
| 246 | max_len = max([self.num_tokens(p.content) for p in chunks]) |
| 247 | logger.warning( |
| 248 | f""" |
| 249 | Unable to split {un_splittables} chunks |
| 250 | using chunk_size = {self.config.chunk_size}. |
| 251 | Max chunk size is {max_len} tokens. |
| 252 | """ |
| 253 | ) |
| 254 | break # we won't be able to shorten them with current settings |
| 255 | chunks = split_chunks.copy() |
| 256 | |
| 257 | self.add_window_ids(chunks) |
| 258 | return chunks |
| 259 | |
| 260 | def _split_para_sentence_once(self, docs: List[Document]) -> List[Document]: |
| 261 | final_chunks = [] |
no test coverage detected