(self, audio_data)
| 93 | print_colored(f"Initialized prompt buffer with {len(self.prompt_buffer)} chunks", "grey") |
| 94 | |
| 95 | async def process_audio(self, audio_data): |
| 96 | if self.chunks_until_live > 0: |
| 97 | print_colored(f"Serving from prompt buffer, {self.chunks_until_live} chunks left", "grey") |
| 98 | chunk = self.prompt_buffer[int(self.replay_seconds * 8) - self.chunks_until_live] |
| 99 | self.chunks_until_live -= 1 |
| 100 | |
| 101 | if self.chunks_until_live == 0: |
| 102 | print_colored("Switching to live processing mode", "green") |
| 103 | |
| 104 | time.sleep(0.05) |
| 105 | return chunk |
| 106 | |
| 107 | audio_tensor = T.from_numpy(audio_data).to(device) |
| 108 | audio_tensor = audio_tensor.reshape(1, 1, -1) |
| 109 | audio_tensor = T.cat([audio_tensor, self.next_model_audio], dim=1) |
| 110 | |
| 111 | with T.autocast(device_type=device, dtype=T.bfloat16), T.inference_mode(): |
| 112 | curr_model_audio = self.model.next_audio_from_audio( |
| 113 | audio_tensor, |
| 114 | temps=TEMPS |
| 115 | ) |
| 116 | print(f"Recorded audio shape {self.recorded_audio.shape}, audio tensor shape {audio_tensor.shape}") |
| 117 | self.recorded_audio = T.cat([self.recorded_audio.cpu(), audio_tensor.squeeze(0).cpu()], dim=-1) |
| 118 | |
| 119 | self.next_model_audio = curr_model_audio |
| 120 | |
| 121 | return curr_model_audio.float().cpu().numpy() |
| 122 | |
| 123 | def cleanup(self): |
| 124 | print_colored("Cleaning up audio processor...", "blue") |
no test coverage detected