MCPcopy Index your code
hub / github.com/riverqueue/river / JobSchedule

Method JobSchedule

riverdriver/riversqlite/river_sqlite_driver.go:728–833  ·  view source on GitHub ↗
(ctx context.Context, params *riverdriver.JobScheduleParams)

Source from the content-addressed store, hash-verified

726}
727
728func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobScheduleParams) ([]*riverdriver.JobScheduleResult, error) {
729 // This operation diverges the most from the Postgres version out of all the
730 // others by far. The Postgres version is one gigantic query that can't
731 // reproduce here because (1) SQLite doesn't support `UPDATE` in CTEs, and
732 // (2) sqlc's handling around any kind of bulk upsert mechanisms (i.e. via
733 // arrays, which don't exist, or jsonb) is completely non-functional.
734 //
735 // Instead, we break the operation up into four much smaller queries. Each
736 // query is mostly trivial, so this does have the effect of simplifying
737 // things conceptually by quite a lot. It is more Go code though, and one
738 // could argue that it's not as concurrently safe as the single query
739 // version (i.e. an advisory lock should maybe be taken around when checking
740 // rows to avoid non-repeatable read anomalies), but since only one writer
741 // at a time is possible for SQLite, this should be okay.
742 return dbutil.WithTxV(ctx, e, func(ctx context.Context, execTx riverdriver.ExecutorTx) ([]*riverdriver.JobScheduleResult, error) {
743 ctx = schemaTemplateParam(ctx, params.Schema)
744 dbtx := templateReplaceWrapper{dbtx: e.driver.UnwrapTx(execTx), replacer: &e.driver.replacer}
745
746 eligibleJobs, err := dbsqlc.New().JobScheduleGetEligible(schemaTemplateParam(ctx, params.Schema), dbtx, &dbsqlc.JobScheduleGetEligibleParams{
747 Max: int64(params.Max),
748 Now: timeStringNullable(params.Now),
749 })
750 if err != nil {
751 return nil, interpretError(err)
752 }
753
754 var (
755 discardIDs []int64
756 nonUniqueIDs []int64
757 scheduledResMap = make(map[int64]*riverdriver.JobScheduleResult)
758 )
759
760 for _, eligibleJob := range eligibleJobs {
761 if eligibleJob.UniqueKey == nil {
762 nonUniqueIDs = append(nonUniqueIDs, eligibleJob.ID)
763 continue
764 }
765
766 internal, err := dbsqlc.New().JobScheduleGetCollision(ctx, dbtx, &dbsqlc.JobScheduleGetCollisionParams{
767 ID: eligibleJob.ID,
768 UniqueKey: eligibleJob.UniqueKey,
769 })
770 if err != nil && !errors.Is(err, sql.ErrNoRows) {
771 return nil, interpretError(err)
772 }
773
774 if internal.ID != 0 {
775 discardIDs = append(discardIDs, eligibleJob.ID)
776 continue
777 }
778
779 // We must set available immediately rather than in the batch below
780 // because it's possible for eligible jobs to be unique conflicting
781 // with each other. So when we start this loop two jobs could be
782 // scheduled with no unique collision, but after the first one is
783 // scheduled, then the second one no longer can and is discarded.
784 updatedJobs, err := dbsqlc.New().JobScheduleSetAvailable(ctx, dbtx, []int64{eligibleJob.ID})
785 if err != nil {

Callers

nothing calls this directly

Implementers 3

Executorriverdriver/riverpgxv5/river_pgx_v5_dr
Executorriverdriver/riverdatabasesql/river_dat
Executorriverdriver/riversqlite/river_sqlite_d

Calls 13

WithTxVFunction · 0.92
NewFunction · 0.92
MapFunction · 0.92
timeStringNullableFunction · 0.85
schemaTemplateParamFunction · 0.70
interpretErrorFunction · 0.70
jobRowFromInternalFunction · 0.70
UnwrapTxMethod · 0.65

Tested by

no test coverage detected