(t *testing.T)
| 248 | } |
| 249 | |
| 250 | func Test_CDC_MultiNode(t *testing.T) { |
| 251 | testFn := func(t *testing.T, failRate int) { |
| 252 | node1 := mustNewLeaderNode("node1") |
| 253 | defer node1.Deprovision() |
| 254 | node2 := mustNewNode("node2", false) |
| 255 | defer node2.Deprovision() |
| 256 | if err := node2.Join(node1); err != nil { |
| 257 | t.Fatalf("node failed to join leader: %s", err.Error()) |
| 258 | } |
| 259 | _, err := node2.WaitForLeader() |
| 260 | if err != nil { |
| 261 | t.Fatalf("failed waiting for leader: %s", err.Error()) |
| 262 | } |
| 263 | node3 := mustNewNode("node3", false) |
| 264 | defer node3.Deprovision() |
| 265 | if err := node3.Join(node1); err != nil { |
| 266 | t.Fatalf("node failed to join leader: %s", err.Error()) |
| 267 | } |
| 268 | _, err = node3.WaitForLeader() |
| 269 | if err != nil { |
| 270 | t.Fatalf("failed waiting for leader: %s", err.Error()) |
| 271 | } |
| 272 | |
| 273 | testEndpoint := cdctest.NewHTTPTestServer() |
| 274 | testEndpoint.SetFailRate(failRate) |
| 275 | testEndpoint.Start() |
| 276 | defer testEndpoint.Close() |
| 277 | |
| 278 | // Configure CDC service for each node. |
| 279 | for _, node := range []*Node{node1, node2, node3} { |
| 280 | cdcCluster := cdc.NewCDCCluster(node.Store, node.Cluster, node.Client) |
| 281 | cdcService, err := cdc.NewService(node.ID, node.Dir, cdcCluster, mustCDCConfig(testEndpoint.URL())) |
| 282 | if err != nil { |
| 283 | panic(fmt.Sprintf("failed to create CDC service: %s", err.Error())) |
| 284 | } |
| 285 | node.CDC = cdcService |
| 286 | node.CDC.Start() |
| 287 | node.Store.EnableCDC(node.CDC.C(), nil, false) |
| 288 | } |
| 289 | |
| 290 | node1.CDC.SetLeader(true) |
| 291 | |
| 292 | _, err = node1.Execute(`CREATE TABLE foo (id integer not null primary key, name text)`) |
| 293 | if err != nil { |
| 294 | t.Fatalf("failed to create table: %v", err) |
| 295 | } |
| 296 | _, err = node1.Execute(`INSERT INTO foo (id, name) VALUES (1, 'Alice')`) |
| 297 | if err != nil { |
| 298 | t.Fatalf("failed to insert data: %v", err) |
| 299 | } |
| 300 | _, err = node1.Execute(`UPDATE foo SET name = 'Alice Updated' WHERE id = 1`) |
| 301 | if err != nil { |
| 302 | t.Fatalf("failed to update data: %v", err) |
| 303 | } |
| 304 | _, err = node1.Execute(`DELETE FROM foo WHERE id = 1`) |
| 305 | if err != nil { |
| 306 | t.Fatalf("failed to delete data: %v", err) |
| 307 | } |
nothing calls this directly
no test coverage detected