()
| 84 | } |
| 85 | |
| 86 | func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Numeric() { |
| 87 | tableName := "test_invalid_numeric" |
| 88 | srcTableName := s.attachSchemaSuffix(tableName) |
| 89 | dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName) |
| 90 | |
| 91 | _, err := s.Conn().Exec(s.t.Context(), fmt.Sprintf(` |
| 92 | CREATE TABLE IF NOT EXISTS %s ( |
| 93 | id INT PRIMARY KEY, |
| 94 | num1 NUMERIC(100, 50), |
| 95 | num2 NUMERIC(100, 50) |
| 96 | ); |
| 97 | `, srcTableName)) |
| 98 | require.NoError(s.t, err) |
| 99 | |
| 100 | _, err = s.Conn().Exec(s.t.Context(), |
| 101 | fmt.Sprintf("INSERT INTO %s (id, num1, num2) VALUES (1,$1,$2)", srcTableName), |
| 102 | "999999999999999999999999999999999999999", |
| 103 | "9999999999999999") |
| 104 | require.NoError(s.t, err) |
| 105 | |
| 106 | connectionGen := FlowConnectionGenerationConfig{ |
| 107 | FlowJobName: s.attachSuffix(tableName), |
| 108 | TableNameMapping: map[string]string{srcTableName: dstTableName}, |
| 109 | Destination: s.Peer().Name, |
| 110 | } |
| 111 | |
| 112 | flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) |
| 113 | flowConnConfig.DoInitialSnapshot = true |
| 114 | |
| 115 | tc := NewTemporalClient(s.t) |
| 116 | env := ExecutePeerflow(s.t, tc, flowConnConfig) |
| 117 | SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) |
| 118 | |
| 119 | EnvWaitFor(s.t, env, 3*time.Minute, "init", func() bool { |
| 120 | records, err := s.sfHelper.ExecuteAndProcessQuery(s.t.Context(), "select num1, num2 from "+dstTableName+" where id = 1") |
| 121 | if err != nil || len(records.Records) == 0 { |
| 122 | return false |
| 123 | } |
| 124 | return records.Records[0][0].Value() == nil && records.Records[0][1].Value() != nil |
| 125 | }) |
| 126 | |
| 127 | // Fewer 9s this time are still invalid, with precision 2 `1` is 3 digits: `1.00` |
| 128 | _, err = s.Conn().Exec(s.t.Context(), |
| 129 | fmt.Sprintf("INSERT INTO %s (id, num1, num2) VALUES (2,$1,$2)", srcTableName), |
| 130 | "9999999999999999999999999999999999", |
| 131 | "9999999999999999") |
| 132 | EnvNoError(s.t, env, err) |
| 133 | |
| 134 | EnvWaitFor(s.t, env, 3*time.Minute, "cdc", func() bool { |
| 135 | records, err := s.sfHelper.ExecuteAndProcessQuery(s.t.Context(), "select num1, num2 from "+dstTableName+" where id = 2") |
| 136 | if err != nil || len(records.Records) == 0 { |
| 137 | return false |
| 138 | } |
| 139 | return records.Records[0][0].Value() == nil && records.Records[0][1].Value() != nil |
| 140 | }) |
| 141 | |
| 142 | env.Cancel(s.t.Context()) |
| 143 | RequireEnvCanceled(s.t, env) |
nothing calls this directly
no test coverage detected