MCPcopy Index your code
hub / github.com/riverqueue/river / insertManyShared

Method insertManyShared

client.go:1969–2024  ·  view source on GitHub ↗

The shared code path for all Insert and InsertMany methods. It takes a function that executes the actual insert operation and allows for different implementations of the insert query to be passed in, each mapping their results back to a common result type.

(
	ctx context.Context,
	tx riverdriver.ExecutorTx,
	insertParams []*rivertype.JobInsertParams,
	execute func(context.Context, []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error),
)

Source from the content-addressed store, hash-verified

1967// implementations of the insert query to be passed in, each mapping their
1968// results back to a common result type.
1969func (c *Client[TTx]) insertManyShared(
1970 ctx context.Context,
1971 tx riverdriver.ExecutorTx,
1972 insertParams []*rivertype.JobInsertParams,
1973 execute func(context.Context, []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error),
1974) ([]*rivertype.JobInsertResult, error) {
1975 doInner := func(ctx context.Context) ([]*rivertype.JobInsertResult, error) {
1976 for _, params := range insertParams {
1977 for _, hook := range append(
1978 c.hookLookupGlobal.ByHookKind(hooklookup.HookKindInsertBegin),
1979 c.hookLookupByJob.ByJobArgs(params.Args).ByHookKind(hooklookup.HookKindInsertBegin)...,
1980 ) {
1981 if err := hook.(rivertype.HookInsertBegin).InsertBegin(ctx, params); err != nil { //nolint:forcetypeassert
1982 return nil, err
1983 }
1984 }
1985 }
1986
1987 finalInsertParams := sliceutil.Map(insertParams, func(params *rivertype.JobInsertParams) *riverdriver.JobInsertFastParams {
1988 return (*riverdriver.JobInsertFastParams)(params)
1989 })
1990
1991 insertResults, err := execute(ctx, finalInsertParams)
1992 if err != nil {
1993 return insertResults, err
1994 }
1995
1996 queues := make([]string, 0, 10)
1997 for _, params := range insertParams {
1998 if params.State == rivertype.JobStateAvailable {
1999 queues = append(queues, params.Queue)
2000 }
2001 }
2002
2003 if err = c.maybeNotifyInsertForQueues(ctx, tx, queues); err != nil {
2004 return nil, err
2005 }
2006
2007 return insertResults, nil
2008 }
2009
2010 jobInsertMiddleware := c.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindJobInsert)
2011 if len(jobInsertMiddleware) > 0 {
2012 // Wrap middlewares in reverse order so the one defined first is wrapped
2013 // as the outermost function and is first to receive the operation.
2014 for i := len(jobInsertMiddleware) - 1; i >= 0; i-- {
2015 middlewareItem := jobInsertMiddleware[i].(rivertype.JobInsertMiddleware) //nolint:forcetypeassert // capture the current middleware item
2016 previousDoInner := doInner // Capture the current doInner function
2017 doInner = func(ctx context.Context) ([]*rivertype.JobInsertResult, error) {
2018 return middlewareItem.InsertMany(ctx, insertParams, previousDoInner)
2019 }
2020 }
2021 }
2022
2023 return doInner(ctx)
2024}
2025
2026// Validates input parameters for a batch insert operation and generates a set

Callers 2

insertManyMethod · 0.95
insertManyFastMethod · 0.95

Calls 7

MapFunction · 0.92
ByJobArgsMethod · 0.80
ByHookKindMethod · 0.65
InsertBeginMethod · 0.65
ByMiddlewareKindMethod · 0.65
InsertManyMethod · 0.65

Tested by

no test coverage detected