JobCompleteTx marks the job as completed as part of transaction tx. If tx is rolled back, the completion will be as well. The function needs to know the type of the River database driver, which is the same as the one in use by Client, but the other generic parameters can be inferred. An invocation
(ctx context.Context, tx TTx, job *Job[TArgs])
| 26 | // |
| 27 | // Returns the updated, completed job. |
| 28 | func JobCompleteTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs JobArgs](ctx context.Context, tx TTx, job *Job[TArgs]) (*Job[TArgs], error) { |
| 29 | if job.State != rivertype.JobStateRunning { |
| 30 | return nil, errors.New("job must be running") |
| 31 | } |
| 32 | |
| 33 | client := ClientFromContext[TTx](ctx) |
| 34 | if client == nil { |
| 35 | return nil, errors.New("client not found in context, can only work within a River worker") |
| 36 | } |
| 37 | |
| 38 | driver := client.Driver() |
| 39 | pilot := client.Pilot() |
| 40 | |
| 41 | // extract metadata updates from context |
| 42 | metadataUpdates, hasMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx) |
| 43 | hasMetadataUpdates = hasMetadataUpdates && len(metadataUpdates) > 0 |
| 44 | var ( |
| 45 | metadataUpdatesBytes []byte |
| 46 | err error |
| 47 | ) |
| 48 | if hasMetadataUpdates { |
| 49 | metadataUpdatesBytes, err = json.Marshal(metadataUpdates) |
| 50 | if err != nil { |
| 51 | return nil, err |
| 52 | } |
| 53 | } |
| 54 | |
| 55 | execTx := driver.UnwrapExecutor(tx) |
| 56 | params := riverdriver.JobSetStateCompleted(job.ID, client.baseService.Time.Now(), nil) |
| 57 | rows, err := pilot.JobSetStateIfRunningMany(ctx, execTx, &riverdriver.JobSetStateIfRunningManyParams{ |
| 58 | ID: []int64{params.ID}, |
| 59 | Attempt: []*int{params.Attempt}, |
| 60 | ErrData: [][]byte{params.ErrData}, |
| 61 | FinalizedAt: []*time.Time{params.FinalizedAt}, |
| 62 | MetadataDoMerge: []bool{hasMetadataUpdates}, |
| 63 | MetadataUpdates: [][]byte{metadataUpdatesBytes}, |
| 64 | ScheduledAt: []*time.Time{params.ScheduledAt}, |
| 65 | Schema: client.config.Schema, |
| 66 | State: []rivertype.JobState{params.State}, |
| 67 | }) |
| 68 | if err != nil { |
| 69 | return nil, err |
| 70 | } |
| 71 | if len(rows) == 0 { |
| 72 | if _, isInsideTestWorker := ctx.Value(execution.ContextKeyInsideTestWorker{}).(bool); isInsideTestWorker { |
| 73 | panic("to use JobCompleteTx in a rivertest.Worker, the job must be inserted into the database first") |
| 74 | } |
| 75 | |
| 76 | return nil, rivertype.ErrNotFound |
| 77 | } |
| 78 | updatedJob := &Job[TArgs]{JobRow: rows[0]} |
| 79 | |
| 80 | if err := json.Unmarshal(updatedJob.EncodedArgs, &updatedJob.Args); err != nil { |
| 81 | return nil, err |
| 82 | } |
| 83 | |
| 84 | return updatedJob, nil |
| 85 | } |
searching dependent graphs…