(ctx context.Context, params *riverdriver.JobScheduleParams)
| 726 | } |
| 727 | |
| 728 | func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobScheduleParams) ([]*riverdriver.JobScheduleResult, error) { |
| 729 | // This operation diverges the most from the Postgres version out of all the |
| 730 | // others by far. The Postgres version is one gigantic query that can't |
| 731 | // reproduce here because (1) SQLite doesn't support `UPDATE` in CTEs, and |
| 732 | // (2) sqlc's handling around any kind of bulk upsert mechanisms (i.e. via |
| 733 | // arrays, which don't exist, or jsonb) is completely non-functional. |
| 734 | // |
| 735 | // Instead, we break the operation up into four much smaller queries. Each |
| 736 | // query is mostly trivial, so this does have the effect of simplifying |
| 737 | // things conceptually by quite a lot. It is more Go code though, and one |
| 738 | // could argue that it's not as concurrently safe as the single query |
| 739 | // version (i.e. an advisory lock should maybe be taken around when checking |
| 740 | // rows to avoid non-repeatable read anomalies), but since only one writer |
| 741 | // at a time is possible for SQLite, this should be okay. |
| 742 | return dbutil.WithTxV(ctx, e, func(ctx context.Context, execTx riverdriver.ExecutorTx) ([]*riverdriver.JobScheduleResult, error) { |
| 743 | ctx = schemaTemplateParam(ctx, params.Schema) |
| 744 | dbtx := templateReplaceWrapper{dbtx: e.driver.UnwrapTx(execTx), replacer: &e.driver.replacer} |
| 745 | |
| 746 | eligibleJobs, err := dbsqlc.New().JobScheduleGetEligible(schemaTemplateParam(ctx, params.Schema), dbtx, &dbsqlc.JobScheduleGetEligibleParams{ |
| 747 | Max: int64(params.Max), |
| 748 | Now: timeStringNullable(params.Now), |
| 749 | }) |
| 750 | if err != nil { |
| 751 | return nil, interpretError(err) |
| 752 | } |
| 753 | |
| 754 | var ( |
| 755 | discardIDs []int64 |
| 756 | nonUniqueIDs []int64 |
| 757 | scheduledResMap = make(map[int64]*riverdriver.JobScheduleResult) |
| 758 | ) |
| 759 | |
| 760 | for _, eligibleJob := range eligibleJobs { |
| 761 | if eligibleJob.UniqueKey == nil { |
| 762 | nonUniqueIDs = append(nonUniqueIDs, eligibleJob.ID) |
| 763 | continue |
| 764 | } |
| 765 | |
| 766 | internal, err := dbsqlc.New().JobScheduleGetCollision(ctx, dbtx, &dbsqlc.JobScheduleGetCollisionParams{ |
| 767 | ID: eligibleJob.ID, |
| 768 | UniqueKey: eligibleJob.UniqueKey, |
| 769 | }) |
| 770 | if err != nil && !errors.Is(err, sql.ErrNoRows) { |
| 771 | return nil, interpretError(err) |
| 772 | } |
| 773 | |
| 774 | if internal.ID != 0 { |
| 775 | discardIDs = append(discardIDs, eligibleJob.ID) |
| 776 | continue |
| 777 | } |
| 778 | |
| 779 | // We must set available immediately rather than in the batch below |
| 780 | // because it's possible for eligible jobs to be unique conflicting |
| 781 | // with each other. So when we start this loop two jobs could be |
| 782 | // scheduled with no unique collision, but after the first one is |
| 783 | // scheduled, then the second one no longer can and is discarded. |
| 784 | updatedJobs, err := dbsqlc.New().JobScheduleSetAvailable(ctx, dbtx, []int64{eligibleJob.ID}) |
| 785 | if err != nil { |
nothing calls this directly
no test coverage detected