This example demonstrates a Rill pipeline that fetches users from an API, and updates their status to active and saves them back. Users are fetched concurrently and in batches to reduce the number of API calls.
()
| 60 | // and updates their status to active and saves them back. |
| 61 | // Users are fetched concurrently and in batches to reduce the number of API calls. |
| 62 | func Example_batching() { |
| 63 | ctx, cancel := context.WithCancel(context.Background()) |
| 64 | defer cancel() |
| 65 | |
| 66 | // Convert a slice of user IDs into a stream |
| 67 | ids := rill.FromSlice([]int{ |
| 68 | 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, |
| 69 | 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, |
| 70 | }, nil) |
| 71 | |
| 72 | // Group IDs into batches of 5 |
| 73 | idBatches := rill.Batch(ids, 5, -1) |
| 74 | |
| 75 | // Bulk fetch users from the API |
| 76 | // Concurrency = 3 |
| 77 | userBatches := rill.Map(idBatches, 3, func(ids []int) ([]*mockapi.User, error) { |
| 78 | return mockapi.GetUsers(ctx, ids) |
| 79 | }) |
| 80 | |
| 81 | // Transform the stream of batches back into a flat stream of users |
| 82 | users := rill.Unbatch(userBatches) |
| 83 | |
| 84 | // Activate users. |
| 85 | // Concurrency = 2 |
| 86 | err := rill.ForEach(users, 2, func(u *mockapi.User) error { |
| 87 | if u.IsActive { |
| 88 | fmt.Printf("User %d is already active\n", u.ID) |
| 89 | return nil |
| 90 | } |
| 91 | |
| 92 | u.IsActive = true |
| 93 | err := mockapi.SaveUser(ctx, u) |
| 94 | if err != nil { |
| 95 | return err |
| 96 | } |
| 97 | |
| 98 | fmt.Printf("User saved: %+v\n", u) |
| 99 | return nil |
| 100 | }) |
| 101 | |
| 102 | // Handle errors |
| 103 | fmt.Println("Error:", err) |
| 104 | } |
| 105 | |
| 106 | // This example demonstrates how batching can be used to group similar concurrent database updates into a single query. |
| 107 | // The UpdateUserTimestamp function is used to update the last_active_at column in the users table. Updates are not |