(ctx context.Context, jobId string, streamId string, reader *streamclient.Reader)
| 812 | } |
| 813 | |
| 814 | func runOutputLoop(ctx context.Context, jobId string, streamId string, reader *streamclient.Reader) { |
| 815 | defer reader.Close() |
| 816 | defer func() { |
| 817 | log.Printf("[job:%s] [stream:%s] output loop finished", jobId, streamId) |
| 818 | }() |
| 819 | |
| 820 | log.Printf("[job:%s] [stream:%s] output loop started", jobId, streamId) |
| 821 | buf := make([]byte, 4096) |
| 822 | for { |
| 823 | n, err := reader.Read(buf) |
| 824 | currentStreamId, _ := jobStreamIds.GetEx(jobId) |
| 825 | if currentStreamId != streamId { |
| 826 | log.Printf("[job:%s] [stream:%s] stream superseded by [stream:%s], exiting output loop", jobId, streamId, currentStreamId) |
| 827 | break |
| 828 | } |
| 829 | if n > 0 { |
| 830 | appendErr := handleAppendJobFile(ctx, jobId, JobOutputFileName, buf[:n]) |
| 831 | if appendErr != nil { |
| 832 | log.Printf("[job:%s] error appending data to WaveFS: %v", jobId, appendErr) |
| 833 | } |
| 834 | } |
| 835 | |
| 836 | if err == io.EOF { |
| 837 | log.Printf("[job:%s] stream ended (EOF)", jobId) |
| 838 | updateErr := wstore.DBUpdateFn(ctx, jobId, func(job *waveobj.Job) { |
| 839 | job.StreamDone = true |
| 840 | }) |
| 841 | if updateErr != nil { |
| 842 | log.Printf("[job:%s] error updating job stream status: %v", jobId, updateErr) |
| 843 | } |
| 844 | tryTerminateJobManager(ctx, jobId) |
| 845 | break |
| 846 | } |
| 847 | |
| 848 | if err != nil { |
| 849 | log.Printf("[job:%s] stream error: %v", jobId, err) |
| 850 | streamErr := err.Error() |
| 851 | updateErr := wstore.DBUpdateFn(ctx, jobId, func(job *waveobj.Job) { |
| 852 | job.StreamDone = true |
| 853 | job.StreamError = streamErr |
| 854 | }) |
| 855 | if updateErr != nil { |
| 856 | log.Printf("[job:%s] error updating job stream error: %v", jobId, updateErr) |
| 857 | } |
| 858 | tryTerminateJobManager(ctx, jobId) |
| 859 | break |
| 860 | } |
| 861 | } |
| 862 | } |
| 863 | |
| 864 | func HandleCmdJobExited(ctx context.Context, jobId string, data wshrpc.CommandJobCmdExitedData) error { |
| 865 | var updatedJob *waveobj.Job |
no test coverage detected