The shared code path for all Insert and InsertMany methods. It takes a function that executes the actual insert operation and allows for different implementations of the insert query to be passed in, each mapping their results back to a common result type.
( ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams, execute func(context.Context, []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error), )
| 1967 | // implementations of the insert query to be passed in, each mapping their |
| 1968 | // results back to a common result type. |
| 1969 | func (c *Client[TTx]) insertManyShared( |
| 1970 | ctx context.Context, |
| 1971 | tx riverdriver.ExecutorTx, |
| 1972 | insertParams []*rivertype.JobInsertParams, |
| 1973 | execute func(context.Context, []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error), |
| 1974 | ) ([]*rivertype.JobInsertResult, error) { |
| 1975 | doInner := func(ctx context.Context) ([]*rivertype.JobInsertResult, error) { |
| 1976 | for _, params := range insertParams { |
| 1977 | for _, hook := range append( |
| 1978 | c.hookLookupGlobal.ByHookKind(hooklookup.HookKindInsertBegin), |
| 1979 | c.hookLookupByJob.ByJobArgs(params.Args).ByHookKind(hooklookup.HookKindInsertBegin)..., |
| 1980 | ) { |
| 1981 | if err := hook.(rivertype.HookInsertBegin).InsertBegin(ctx, params); err != nil { //nolint:forcetypeassert |
| 1982 | return nil, err |
| 1983 | } |
| 1984 | } |
| 1985 | } |
| 1986 | |
| 1987 | finalInsertParams := sliceutil.Map(insertParams, func(params *rivertype.JobInsertParams) *riverdriver.JobInsertFastParams { |
| 1988 | return (*riverdriver.JobInsertFastParams)(params) |
| 1989 | }) |
| 1990 | |
| 1991 | insertResults, err := execute(ctx, finalInsertParams) |
| 1992 | if err != nil { |
| 1993 | return insertResults, err |
| 1994 | } |
| 1995 | |
| 1996 | queues := make([]string, 0, 10) |
| 1997 | for _, params := range insertParams { |
| 1998 | if params.State == rivertype.JobStateAvailable { |
| 1999 | queues = append(queues, params.Queue) |
| 2000 | } |
| 2001 | } |
| 2002 | |
| 2003 | if err = c.maybeNotifyInsertForQueues(ctx, tx, queues); err != nil { |
| 2004 | return nil, err |
| 2005 | } |
| 2006 | |
| 2007 | return insertResults, nil |
| 2008 | } |
| 2009 | |
| 2010 | jobInsertMiddleware := c.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindJobInsert) |
| 2011 | if len(jobInsertMiddleware) > 0 { |
| 2012 | // Wrap middlewares in reverse order so the one defined first is wrapped |
| 2013 | // as the outermost function and is first to receive the operation. |
| 2014 | for i := len(jobInsertMiddleware) - 1; i >= 0; i-- { |
| 2015 | middlewareItem := jobInsertMiddleware[i].(rivertype.JobInsertMiddleware) //nolint:forcetypeassert // capture the current middleware item |
| 2016 | previousDoInner := doInner // Capture the current doInner function |
| 2017 | doInner = func(ctx context.Context) ([]*rivertype.JobInsertResult, error) { |
| 2018 | return middlewareItem.InsertMany(ctx, insertParams, previousDoInner) |
| 2019 | } |
| 2020 | } |
| 2021 | } |
| 2022 | |
| 2023 | return doInner(ctx) |
| 2024 | } |
| 2025 | |
| 2026 | // Validates input parameters for a batch insert operation and generates a set |
no test coverage detected