(ctx context.Context, jobId string, rtOpts *waveobj.RuntimeOpts)
| 1057 | } |
| 1058 | |
| 1059 | func doReconnectJob(ctx context.Context, jobId string, rtOpts *waveobj.RuntimeOpts) error { |
| 1060 | job, err := wstore.DBMustGet[*waveobj.Job](ctx, jobId) |
| 1061 | if err != nil { |
| 1062 | return fmt.Errorf("failed to get job: %w", err) |
| 1063 | } |
| 1064 | |
| 1065 | _, err = CheckJobConnected(ctx, jobId) |
| 1066 | if err == nil { |
| 1067 | log.Printf("[job:%s] already connected, skipping reconnect", jobId) |
| 1068 | return nil |
| 1069 | } |
| 1070 | log.Printf("[job:%s] not connected, proceeding with reconnect: %v", jobId, err) |
| 1071 | |
| 1072 | isConnected, err := conncontroller.IsConnected(job.Connection) |
| 1073 | if err != nil { |
| 1074 | return fmt.Errorf("error checking connection status: %w", err) |
| 1075 | } |
| 1076 | if !isConnected { |
| 1077 | return fmt.Errorf("connection %q is not connected", job.Connection) |
| 1078 | } |
| 1079 | |
| 1080 | if job.TerminateOnReconnect { |
| 1081 | return remoteTerminateJobManager(ctx, job) |
| 1082 | } |
| 1083 | |
| 1084 | if rtOpts == nil { |
| 1085 | rtOpts = &waveobj.RuntimeOpts{ |
| 1086 | TermSize: job.CmdTermSize, |
| 1087 | } |
| 1088 | } |
| 1089 | |
| 1090 | bareRpc := wshclient.GetBareRpcClient() |
| 1091 | |
| 1092 | jobAccessClaims := &wavejwt.WaveJwtClaims{ |
| 1093 | MainServer: true, |
| 1094 | JobId: jobId, |
| 1095 | } |
| 1096 | jobAccessToken, err := wavejwt.Sign(jobAccessClaims) |
| 1097 | if err != nil { |
| 1098 | return fmt.Errorf("failed to generate job access token: %w", err) |
| 1099 | } |
| 1100 | |
| 1101 | reconnectData := wshrpc.CommandRemoteReconnectToJobManagerData{ |
| 1102 | JobId: jobId, |
| 1103 | JobAuthToken: job.JobAuthToken, |
| 1104 | MainServerJwtToken: jobAccessToken, |
| 1105 | JobManagerPid: job.JobManagerPid, |
| 1106 | JobManagerStartTs: job.JobManagerStartTs, |
| 1107 | } |
| 1108 | |
| 1109 | rpcOpts := &wshrpc.RpcOpts{ |
| 1110 | Route: wshutil.MakeConnectionRouteId(job.Connection), |
| 1111 | Timeout: 5000, |
| 1112 | } |
| 1113 | |
| 1114 | log.Printf("[job:%s] sending RemoteReconnectToJobManagerCommand to connection %s", jobId, job.Connection) |
| 1115 | rtnData, err := wshclient.RemoteReconnectToJobManagerCommand(bareRpc, reconnectData, rpcOpts) |
| 1116 | if err != nil { |
no test coverage detected