| 550 | } |
| 551 | |
| 552 | func (s *server) AudioTransformStream(stream pb.Backend_AudioTransformStreamServer) error { |
| 553 | if s.llm.Locking() { |
| 554 | s.llm.Lock() |
| 555 | defer s.llm.Unlock() |
| 556 | } |
| 557 | |
| 558 | in := make(chan *pb.AudioTransformFrameRequest, 4) |
| 559 | out := make(chan *pb.AudioTransformFrameResponse, 4) |
| 560 | |
| 561 | // Pump incoming frames from the gRPC stream into `in`. EOF closes the |
| 562 | // channel, which signals the backend that the client is done sending. |
| 563 | recvErrCh := make(chan error, 1) |
| 564 | go func() { |
| 565 | defer close(in) |
| 566 | for { |
| 567 | req, err := stream.Recv() |
| 568 | if err != nil { |
| 569 | if errors.Is(err, io.EOF) { |
| 570 | recvErrCh <- nil |
| 571 | return |
| 572 | } |
| 573 | recvErrCh <- err |
| 574 | return |
| 575 | } |
| 576 | select { |
| 577 | case in <- req: |
| 578 | case <-stream.Context().Done(): |
| 579 | recvErrCh <- stream.Context().Err() |
| 580 | return |
| 581 | } |
| 582 | } |
| 583 | }() |
| 584 | |
| 585 | // Pump outgoing frames from `out` to the gRPC stream. The backend closes |
| 586 | // `out` on completion. |
| 587 | sendDone := make(chan error, 1) |
| 588 | go func() { |
| 589 | for resp := range out { |
| 590 | if err := stream.Send(resp); err != nil { |
| 591 | sendDone <- err |
| 592 | // Drain `out` so the backend can finish. |
| 593 | for range out { |
| 594 | } |
| 595 | return |
| 596 | } |
| 597 | } |
| 598 | sendDone <- nil |
| 599 | }() |
| 600 | |
| 601 | backendErr := s.llm.AudioTransformStream(in, out) |
| 602 | sendErr := <-sendDone |
| 603 | recvErr := <-recvErrCh |
| 604 | |
| 605 | if backendErr != nil { |
| 606 | return backendErr |
| 607 | } |
| 608 | if sendErr != nil { |
| 609 | return sendErr |