MCPcopy
hub / github.com/wavetermdev/waveterm / runOutputLoop

Function runOutputLoop

pkg/jobcontroller/jobcontroller.go:814–862  ·  view source on GitHub ↗
(ctx context.Context, jobId string, streamId string, reader *streamclient.Reader)

Source from the content-addressed store, hash-verified

812}
813
814func runOutputLoop(ctx context.Context, jobId string, streamId string, reader *streamclient.Reader) {
815 defer reader.Close()
816 defer func() {
817 log.Printf("[job:%s] [stream:%s] output loop finished", jobId, streamId)
818 }()
819
820 log.Printf("[job:%s] [stream:%s] output loop started", jobId, streamId)
821 buf := make([]byte, 4096)
822 for {
823 n, err := reader.Read(buf)
824 currentStreamId, _ := jobStreamIds.GetEx(jobId)
825 if currentStreamId != streamId {
826 log.Printf("[job:%s] [stream:%s] stream superseded by [stream:%s], exiting output loop", jobId, streamId, currentStreamId)
827 break
828 }
829 if n > 0 {
830 appendErr := handleAppendJobFile(ctx, jobId, JobOutputFileName, buf[:n])
831 if appendErr != nil {
832 log.Printf("[job:%s] error appending data to WaveFS: %v", jobId, appendErr)
833 }
834 }
835
836 if err == io.EOF {
837 log.Printf("[job:%s] stream ended (EOF)", jobId)
838 updateErr := wstore.DBUpdateFn(ctx, jobId, func(job *waveobj.Job) {
839 job.StreamDone = true
840 })
841 if updateErr != nil {
842 log.Printf("[job:%s] error updating job stream status: %v", jobId, updateErr)
843 }
844 tryTerminateJobManager(ctx, jobId)
845 break
846 }
847
848 if err != nil {
849 log.Printf("[job:%s] stream error: %v", jobId, err)
850 streamErr := err.Error()
851 updateErr := wstore.DBUpdateFn(ctx, jobId, func(job *waveobj.Job) {
852 job.StreamDone = true
853 job.StreamError = streamErr
854 })
855 if updateErr != nil {
856 log.Printf("[job:%s] error updating job stream error: %v", jobId, updateErr)
857 }
858 tryTerminateJobManager(ctx, jobId)
859 break
860 }
861 }
862}
863
864func HandleCmdJobExited(ctx context.Context, jobId string, data wshrpc.CommandJobCmdExitedData) error {
865 var updatedJob *waveobj.Job

Callers 2

StartJobFunction · 0.85
restartStreamingFunction · 0.85

Calls 7

DBUpdateFnFunction · 0.92
handleAppendJobFileFunction · 0.85
tryTerminateJobManagerFunction · 0.85
GetExMethod · 0.80
CloseMethod · 0.65
ReadMethod · 0.45
ErrorMethod · 0.45

Tested by

no test coverage detected