(ctx context.Context, job *waveobj.Job)
| 987 | } |
| 988 | |
| 989 | func remoteTerminateJobManager(ctx context.Context, job *waveobj.Job) error { |
| 990 | log.Printf("[job:%s] terminating job manager", job.OID) |
| 991 | |
| 992 | shouldWrite := jobTerminationMessageWritten.TestAndSet(job.OID, true, func(val bool, exists bool) bool { |
| 993 | return !exists || !val |
| 994 | }) |
| 995 | if shouldWrite { |
| 996 | resetTerminalState(ctx, job.AttachedBlockId) |
| 997 | writeMutedMessageToTerminal(job.AttachedBlockId, "[shell terminated]") |
| 998 | } |
| 999 | |
| 1000 | if job.JobManagerStatus == JobManagerStatus_Done { |
| 1001 | log.Printf("[job:%s] job manager already marked as done, skipping termination", job.OID) |
| 1002 | return nil |
| 1003 | } |
| 1004 | |
| 1005 | bareRpc := wshclient.GetBareRpcClient() |
| 1006 | terminateData := wshrpc.CommandRemoteTerminateJobManagerData{ |
| 1007 | JobId: job.OID, |
| 1008 | JobManagerPid: job.JobManagerPid, |
| 1009 | JobManagerStartTs: job.JobManagerStartTs, |
| 1010 | } |
| 1011 | |
| 1012 | rpcOpts := &wshrpc.RpcOpts{ |
| 1013 | Route: wshutil.MakeConnectionRouteId(job.Connection), |
| 1014 | Timeout: 5000, |
| 1015 | } |
| 1016 | |
| 1017 | err := wshclient.RemoteTerminateJobManagerCommand(bareRpc, terminateData, rpcOpts) |
| 1018 | if err != nil { |
| 1019 | log.Printf("[job:%s] error terminating job manager: %v", job.OID, err) |
| 1020 | return fmt.Errorf("failed to terminate job manager: %w", err) |
| 1021 | } |
| 1022 | |
| 1023 | var updatedJob *waveobj.Job |
| 1024 | updateErr := wstore.DBUpdateFn(ctx, job.OID, func(job *waveobj.Job) { |
| 1025 | job.JobManagerStatus = JobManagerStatus_Done |
| 1026 | job.JobManagerDoneReason = JobDoneReason_Terminated |
| 1027 | job.TerminateOnReconnect = false |
| 1028 | if !job.StreamDone { |
| 1029 | job.StreamDone = true |
| 1030 | job.StreamError = "job manager terminated" |
| 1031 | } |
| 1032 | updatedJob = job |
| 1033 | }) |
| 1034 | if updateErr != nil { |
| 1035 | log.Printf("[job:%s] error updating job status after termination: %v", job.OID, updateErr) |
| 1036 | } else { |
| 1037 | sendBlockJobStatusEventByJob(ctx, updatedJob) |
| 1038 | } |
| 1039 | |
| 1040 | telemetry.GoRecordTEventWrap(&telemetrydata.TEvent{ |
| 1041 | Event: "job:done", |
| 1042 | Props: telemetrydata.TEventProps{ |
| 1043 | JobDoneReason: JobDoneReason_Terminated, |
| 1044 | JobKind: job.JobKind, |
| 1045 | }, |
| 1046 | }) |
no test coverage detected