MCPcopy
hub / github.com/wavetermdev/waveterm / restartStreaming

Function restartStreaming

pkg/jobcontroller/jobcontroller.go:1204–1345  ·  view source on GitHub ↗
(ctx context.Context, jobId string, knownConnected bool, rtOpts *waveobj.RuntimeOpts)

Source from the content-addressed store, hash-verified

1202}
1203
1204func restartStreaming(ctx context.Context, jobId string, knownConnected bool, rtOpts *waveobj.RuntimeOpts) error {
1205 job, err := wstore.DBMustGet[*waveobj.Job](ctx, jobId)
1206 if err != nil {
1207 return fmt.Errorf("failed to get job: %w", err)
1208 }
1209
1210 termSize := job.CmdTermSize
1211 if rtOpts != nil && rtOpts.TermSize.Rows > 0 && rtOpts.TermSize.Cols > 0 {
1212 termSize = rtOpts.TermSize
1213 err = wstore.DBUpdateFn(ctx, jobId, func(job *waveobj.Job) {
1214 job.CmdTermSize = termSize
1215 })
1216 if err != nil {
1217 log.Printf("[job:%s] warning: failed to update termsize in DB: %v", jobId, err)
1218 }
1219 }
1220
1221 if !knownConnected {
1222 isConnected, err := conncontroller.IsConnected(job.Connection)
1223 if err != nil {
1224 return fmt.Errorf("error checking connection status: %w", err)
1225 }
1226 if !isConnected {
1227 return fmt.Errorf("connection %q is not connected", job.Connection)
1228 }
1229
1230 jobConnStatus := GetJobConnStatus(jobId)
1231 if jobConnStatus != JobConnStatus_Connected {
1232 return fmt.Errorf("job manager is not connected (status: %s)", jobConnStatus)
1233 }
1234 }
1235
1236 var currentSeq int64 = 0
1237 var totalGap int64 = 0
1238 waveFile, err := filestore.WFS.Stat(ctx, jobId, JobOutputFileName)
1239 if err == nil {
1240 currentSeq = waveFile.Size
1241 totalGap = getMetaInt64(waveFile.Meta, MetaKey_TotalGap)
1242 currentSeq += totalGap
1243 }
1244
1245 bareRpc := wshclient.GetBareRpcClient()
1246 broker := bareRpc.StreamBroker
1247 readerRouteId := wshclient.GetBareRpcClientRouteId()
1248 writerRouteId := wshutil.MakeJobRouteId(jobId)
1249 reader, streamMeta := broker.CreateStreamReaderWithSeq(readerRouteId, writerRouteId, DefaultStreamRwnd, currentSeq)
1250 jobStreamIds.Set(jobId, streamMeta.Id)
1251
1252 prepareData := wshrpc.CommandJobPrepareConnectData{
1253 StreamMeta: *streamMeta,
1254 Seq: currentSeq,
1255 TermSize: termSize,
1256 }
1257
1258 rpcOpts := &wshrpc.RpcOpts{
1259 Route: wshutil.MakeJobRouteId(jobId),
1260 Timeout: 5000,
1261 }

Callers 1

doReconnectJobFunction · 0.85

Calls 15

DBMustGetFunction · 0.92
DBUpdateFnFunction · 0.92
IsConnectedFunction · 0.92
GetBareRpcClientFunction · 0.92
GetBareRpcClientRouteIdFunction · 0.92
MakeJobRouteIdFunction · 0.92
JobPrepareConnectCommandFunction · 0.92
JobStartStreamCommandFunction · 0.92
PanicHandlerFunction · 0.92
GetJobConnStatusFunction · 0.85
getMetaInt64Function · 0.85
HandleCmdJobExitedFunction · 0.85

Tested by

no test coverage detected