(ctx context.Context, connName string)
| 1169 | } |
| 1170 | |
| 1171 | func ReconnectJobsForConn(ctx context.Context, connName string) error { |
| 1172 | isConnected, err := conncontroller.IsConnected(connName) |
| 1173 | if err != nil { |
| 1174 | return fmt.Errorf("error checking connection status: %w", err) |
| 1175 | } |
| 1176 | if !isConnected { |
| 1177 | return fmt.Errorf("connection %q is not connected", connName) |
| 1178 | } |
| 1179 | |
| 1180 | allJobs, err := wstore.DBGetAllObjsByType[*waveobj.Job](ctx, waveobj.OType_Job) |
| 1181 | if err != nil { |
| 1182 | return fmt.Errorf("failed to get jobs: %w", err) |
| 1183 | } |
| 1184 | |
| 1185 | var jobsToReconnect []*waveobj.Job |
| 1186 | for _, job := range allJobs { |
| 1187 | if job.Connection == connName && isJobManagerRunning(job) { |
| 1188 | jobsToReconnect = append(jobsToReconnect, job) |
| 1189 | } |
| 1190 | } |
| 1191 | |
| 1192 | log.Printf("[conn:%s] found %d jobs to reconnect", connName, len(jobsToReconnect)) |
| 1193 | |
| 1194 | for _, job := range jobsToReconnect { |
| 1195 | err = ReconnectJob(ctx, job.OID, nil) |
| 1196 | if err != nil { |
| 1197 | log.Printf("[job:%s] error reconnecting: %v", job.OID, err) |
| 1198 | } |
| 1199 | } |
| 1200 | |
| 1201 | return nil |
| 1202 | } |
| 1203 | |
| 1204 | func restartStreaming(ctx context.Context, jobId string, knownConnected bool, rtOpts *waveobj.RuntimeOpts) error { |
| 1205 | job, err := wstore.DBMustGet[*waveobj.Job](ctx, jobId) |
no test coverage detected