(ctx context.Context, jobId string, knownConnected bool, rtOpts *waveobj.RuntimeOpts)
| 1202 | } |
| 1203 | |
| 1204 | func restartStreaming(ctx context.Context, jobId string, knownConnected bool, rtOpts *waveobj.RuntimeOpts) error { |
| 1205 | job, err := wstore.DBMustGet[*waveobj.Job](ctx, jobId) |
| 1206 | if err != nil { |
| 1207 | return fmt.Errorf("failed to get job: %w", err) |
| 1208 | } |
| 1209 | |
| 1210 | termSize := job.CmdTermSize |
| 1211 | if rtOpts != nil && rtOpts.TermSize.Rows > 0 && rtOpts.TermSize.Cols > 0 { |
| 1212 | termSize = rtOpts.TermSize |
| 1213 | err = wstore.DBUpdateFn(ctx, jobId, func(job *waveobj.Job) { |
| 1214 | job.CmdTermSize = termSize |
| 1215 | }) |
| 1216 | if err != nil { |
| 1217 | log.Printf("[job:%s] warning: failed to update termsize in DB: %v", jobId, err) |
| 1218 | } |
| 1219 | } |
| 1220 | |
| 1221 | if !knownConnected { |
| 1222 | isConnected, err := conncontroller.IsConnected(job.Connection) |
| 1223 | if err != nil { |
| 1224 | return fmt.Errorf("error checking connection status: %w", err) |
| 1225 | } |
| 1226 | if !isConnected { |
| 1227 | return fmt.Errorf("connection %q is not connected", job.Connection) |
| 1228 | } |
| 1229 | |
| 1230 | jobConnStatus := GetJobConnStatus(jobId) |
| 1231 | if jobConnStatus != JobConnStatus_Connected { |
| 1232 | return fmt.Errorf("job manager is not connected (status: %s)", jobConnStatus) |
| 1233 | } |
| 1234 | } |
| 1235 | |
| 1236 | var currentSeq int64 = 0 |
| 1237 | var totalGap int64 = 0 |
| 1238 | waveFile, err := filestore.WFS.Stat(ctx, jobId, JobOutputFileName) |
| 1239 | if err == nil { |
| 1240 | currentSeq = waveFile.Size |
| 1241 | totalGap = getMetaInt64(waveFile.Meta, MetaKey_TotalGap) |
| 1242 | currentSeq += totalGap |
| 1243 | } |
| 1244 | |
| 1245 | bareRpc := wshclient.GetBareRpcClient() |
| 1246 | broker := bareRpc.StreamBroker |
| 1247 | readerRouteId := wshclient.GetBareRpcClientRouteId() |
| 1248 | writerRouteId := wshutil.MakeJobRouteId(jobId) |
| 1249 | reader, streamMeta := broker.CreateStreamReaderWithSeq(readerRouteId, writerRouteId, DefaultStreamRwnd, currentSeq) |
| 1250 | jobStreamIds.Set(jobId, streamMeta.Id) |
| 1251 | |
| 1252 | prepareData := wshrpc.CommandJobPrepareConnectData{ |
| 1253 | StreamMeta: *streamMeta, |
| 1254 | Seq: currentSeq, |
| 1255 | TermSize: termSize, |
| 1256 | } |
| 1257 | |
| 1258 | rpcOpts := &wshrpc.RpcOpts{ |
| 1259 | Route: wshutil.MakeJobRouteId(jobId), |
| 1260 | Timeout: 5000, |
| 1261 | } |
no test coverage detected