MCPcopy Index your code
hub / github.com/riverqueue/river / JobCompleteTx

Function JobCompleteTx

job_complete_tx.go:28–85  ·  view source on GitHub ↗

JobCompleteTx marks the job as completed as part of transaction tx. If tx is rolled back, the completion will be as well. The function needs to know the type of the River database driver, which is the same as the one in use by Client, but the other generic parameters can be inferred. An invocation

(ctx context.Context, tx TTx, job *Job[TArgs])

Source from the content-addressed store, hash-verified

26//
27// Returns the updated, completed job.
28func JobCompleteTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs JobArgs](ctx context.Context, tx TTx, job *Job[TArgs]) (*Job[TArgs], error) {
29 if job.State != rivertype.JobStateRunning {
30 return nil, errors.New("job must be running")
31 }
32
33 client := ClientFromContext[TTx](ctx)
34 if client == nil {
35 return nil, errors.New("client not found in context, can only work within a River worker")
36 }
37
38 driver := client.Driver()
39 pilot := client.Pilot()
40
41 // extract metadata updates from context
42 metadataUpdates, hasMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx)
43 hasMetadataUpdates = hasMetadataUpdates && len(metadataUpdates) > 0
44 var (
45 metadataUpdatesBytes []byte
46 err error
47 )
48 if hasMetadataUpdates {
49 metadataUpdatesBytes, err = json.Marshal(metadataUpdates)
50 if err != nil {
51 return nil, err
52 }
53 }
54
55 execTx := driver.UnwrapExecutor(tx)
56 params := riverdriver.JobSetStateCompleted(job.ID, client.baseService.Time.Now(), nil)
57 rows, err := pilot.JobSetStateIfRunningMany(ctx, execTx, &riverdriver.JobSetStateIfRunningManyParams{
58 ID: []int64{params.ID},
59 Attempt: []*int{params.Attempt},
60 ErrData: [][]byte{params.ErrData},
61 FinalizedAt: []*time.Time{params.FinalizedAt},
62 MetadataDoMerge: []bool{hasMetadataUpdates},
63 MetadataUpdates: [][]byte{metadataUpdatesBytes},
64 ScheduledAt: []*time.Time{params.ScheduledAt},
65 Schema: client.config.Schema,
66 State: []rivertype.JobState{params.State},
67 })
68 if err != nil {
69 return nil, err
70 }
71 if len(rows) == 0 {
72 if _, isInsideTestWorker := ctx.Value(execution.ContextKeyInsideTestWorker{}).(bool); isInsideTestWorker {
73 panic("to use JobCompleteTx in a rivertest.Worker, the job must be inserted into the database first")
74 }
75
76 return nil, rivertype.ErrNotFound
77 }
78 updatedJob := &Job[TArgs]{JobRow: rows[0]}
79
80 if err := json.Unmarshal(updatedJob.EncodedArgs, &updatedJob.Args); err != nil {
81 return nil, err
82 }
83
84 return updatedJob, nil
85}

Callers 4

WorkMethod · 0.92
TestWorker_WorkJobFunction · 0.92
TestJobCompleteTxFunction · 0.85

Calls 8

JobSetStateCompletedFunction · 0.92
DriverMethod · 0.80
PilotMethod · 0.80
UnwrapExecutorMethod · 0.65
NowMethod · 0.65
ValueMethod · 0.45

Tested by 4

WorkMethod · 0.74
TestWorker_WorkJobFunction · 0.74
TestJobCompleteTxFunction · 0.68

Used in the wild real call sites across dependent graphs

searching dependent graphs…