| 180 | |
| 181 | |
| 182 | def write_output( |
| 183 | path: str, |
| 184 | segments: List[Segment], |
| 185 | output_format: OutputFormat, |
| 186 | segment_key: str = 'text' |
| 187 | ): |
| 188 | logging.debug( |
| 189 | "Writing transcription output, path = %s, output format = %s, number of segments = %s", |
| 190 | path, |
| 191 | output_format, |
| 192 | len(segments), |
| 193 | ) |
| 194 | |
| 195 | with open(os.fsencode(path), "w", encoding="utf-8") as file: |
| 196 | if output_format == OutputFormat.TXT: |
| 197 | combined_text = "" |
| 198 | previous_end_time = None |
| 199 | |
| 200 | paragraph_split_time = int(os.getenv("BUZZ_PARAGRAPH_SPLIT_TIME", "2000")) |
| 201 | |
| 202 | for segment in segments: |
| 203 | if previous_end_time is not None and (segment.start - previous_end_time) >= paragraph_split_time: |
| 204 | combined_text += "\n\n" |
| 205 | combined_text += getattr(segment, segment_key).strip() + " " |
| 206 | previous_end_time = segment.end |
| 207 | |
| 208 | file.write(combined_text) |
| 209 | |
| 210 | elif output_format == OutputFormat.VTT: |
| 211 | file.write("WEBVTT\n\n") |
| 212 | for segment in segments: |
| 213 | file.write( |
| 214 | f"{to_timestamp(segment.start)} --> {to_timestamp(segment.end)}\n" |
| 215 | ) |
| 216 | file.write(f"{getattr(segment, segment_key)}\n\n") |
| 217 | |
| 218 | elif output_format == OutputFormat.SRT: |
| 219 | for i, segment in enumerate(segments): |
| 220 | file.write(f"{i + 1}\n") |
| 221 | file.write( |
| 222 | f'{to_timestamp(segment.start, ms_separator=",")} --> {to_timestamp(segment.end, ms_separator=",")}\n' |
| 223 | ) |
| 224 | file.write(f"{getattr(segment, segment_key)}\n\n") |
| 225 | |
| 226 | logging.debug("Written transcription output") |
| 227 | |
| 228 | |
| 229 | def to_timestamp(ms: float, ms_separator=".") -> str: |