(self, request, job)
| 639 | job.progress_queue.put(None) |
| 640 | |
| 641 | def _do_train(self, request, job): |
| 642 | from liquid_audio import LFM2AudioModel # noqa: F401 (sanity import) |
| 643 | from liquid_audio.data.dataloader import LFM2DataLoader |
| 644 | from liquid_audio.trainer import Trainer |
| 645 | |
| 646 | model_id = request.model or self.model_id or "LiquidAI/LFM2.5-Audio-1.5B" |
| 647 | |
| 648 | dataset_path = request.dataset_source |
| 649 | if not dataset_path: |
| 650 | raise ValueError("dataset_source is required (path to a preprocessed dataset)") |
| 651 | |
| 652 | extras = dict(request.extra_options) if request.extra_options else {} |
| 653 | val_path = extras.get("val_dataset") |
| 654 | |
| 655 | # Map FineTuneRequest hyperparameters to liquid_audio.Trainer constructor args |
| 656 | lr = request.learning_rate or 3e-5 |
| 657 | max_steps = request.max_steps or 1000 |
| 658 | warmup_steps = request.warmup_steps or min(100, max_steps // 10) |
| 659 | batch_size = request.batch_size or 16 |
| 660 | save_interval = request.save_steps or max(1, max_steps // 4) |
| 661 | |
| 662 | output_dir = request.output_dir or os.path.join( |
| 663 | os.environ.get("LIQUID_AUDIO_OUTPUT_DIR", "/tmp"), |
| 664 | f"liquid-audio-{job.job_id}", |
| 665 | ) |
| 666 | os.makedirs(output_dir, exist_ok=True) |
| 667 | |
| 668 | job.progress_queue.put(backend_pb2.FineTuneProgressUpdate( |
| 669 | job_id=job.job_id, status="loading_dataset", |
| 670 | message=f"Loading preprocessed dataset from {dataset_path}", |
| 671 | )) |
| 672 | train_data = LFM2DataLoader(dataset_path) |
| 673 | val_data = LFM2DataLoader(val_path) if val_path else None |
| 674 | |
| 675 | job.progress_queue.put(backend_pb2.FineTuneProgressUpdate( |
| 676 | job_id=job.job_id, status="loading_model", |
| 677 | message=f"Loading base model {model_id}", |
| 678 | )) |
| 679 | |
| 680 | # The Liquid Trainer logs via self.accelerator.print; we subclass it to |
| 681 | # also push progress events onto the queue every logging_interval steps. |
| 682 | progress_q = job.progress_queue |
| 683 | |
| 684 | class QueuedTrainer(Trainer): |
| 685 | def log(self_, model_output): |
| 686 | if self_.step > 0 and self_.step % self_.logging_interval == 0: |
| 687 | try: |
| 688 | loss = self_.accelerator.reduce( |
| 689 | model_output.loss.detach(), reduction="mean" |
| 690 | ).item() |
| 691 | except Exception: |
| 692 | loss = float("nan") |
| 693 | lr_now = self_.optimizer.param_groups[0]["lr"] |
| 694 | pct = (self_.step / self_.max_steps * 100.0) if self_.max_steps else 0.0 |
| 695 | progress_q.put(backend_pb2.FineTuneProgressUpdate( |
| 696 | job_id=job.job_id, |
| 697 | current_step=int(self_.step), |
| 698 | total_steps=int(self_.max_steps), |
no test coverage detected