MCPcopy Index your code
hub / github.com/riverqueue/river / Example_resumableCursor

Function Example_resumableCursor

example_resumable_cursor_job_test.go:55–105  ·  view source on GitHub ↗

Example_resumableCursor demonstrates the use of a resumable cursor step, a step that can store arbitrary JSON state to resume a partially completed loop.

()

Source from the content-addressed store, hash-verified

53// Example_resumableCursor demonstrates the use of a resumable cursor step, a
54// step that can store arbitrary JSON state to resume a partially completed loop.
55func Example_resumableCursor() {
56 ctx := context.Background()
57
58 dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
59 if err != nil {
60 panic(err)
61 }
62 defer dbPool.Close()
63
64 workers := river.NewWorkers()
65 river.AddWorker(workers, &ResumableCursorWorker{})
66
67 riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
68 Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTime})),
69 Queues: map[string]river.QueueConfig{
70 river.QueueDefault: {MaxWorkers: 100},
71 },
72 Schema: riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil), // only necessary for the example test
73 TestOnly: true, // suitable only for use in tests; remove for live environments
74 Workers: workers,
75 })
76 if err != nil {
77 panic(err)
78 }
79
80 // Out of example scope, but used to wait until a job is worked.
81 subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
82 defer subscribeCancel()
83
84 if err := riverClient.Start(ctx); err != nil {
85 panic(err)
86 }
87
88 if _, err = riverClient.Insert(ctx, ResumableCursorArgs{
89 IDs: []int{1, 2, 3},
90 }, nil); err != nil {
91 panic(err)
92 }
93
94 // Wait for jobs to complete. Only needed for purposes of the example test.
95 riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)
96
97 if err := riverClient.Stop(ctx); err != nil {
98 panic(err)
99 }
100
101 // Output:
102 // Processed 1
103 // Processed 2
104 // Processed 3
105}

Callers

nothing calls this directly

Calls 13

TestDatabaseURLFunction · 0.92
NewWorkersFunction · 0.92
AddWorkerFunction · 0.92
NewClientFunction · 0.92
NewFunction · 0.92
TestSchemaFunction · 0.92
PanicTBFunction · 0.92
WaitOrTimeoutNFunction · 0.92
SubscribeMethod · 0.80
InsertMethod · 0.80
CloseMethod · 0.65
StartMethod · 0.65

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…