(msgs *[]*sarama.ProducerMessage, rowsIgnored *int, tableName string, shardID int32, columnNames []string, rows []client.Row, updateModes ...memCom.ColumnUpdateMode)
| 158 | } |
| 159 | |
| 160 | func (kp *KafkaPublisher) buildKafkaMessage(msgs *[]*sarama.ProducerMessage, rowsIgnored *int, tableName string, shardID int32, columnNames []string, rows []client.Row, |
| 161 | updateModes ...memCom.ColumnUpdateMode) { |
| 162 | bytes, numRows, err := kp.UpsertBatchBuilder.PrepareUpsertBatch(tableName, columnNames, updateModes, rows) |
| 163 | if err != nil { |
| 164 | kp.Scope.Counter("errors.upsertBatchBuild").Inc(1) |
| 165 | kp.ServiceConfig.Logger.Error("Failed to prepare rows", |
| 166 | zap.String("table", tableName), |
| 167 | zap.Any("columns", columnNames), |
| 168 | zap.Error(err)) |
| 169 | } |
| 170 | |
| 171 | if len(bytes) > 0 { |
| 172 | msg := sarama.ProducerMessage{ |
| 173 | Topic: fmt.Sprintf("ares-redolog-%s-%s", kp.Cluster(), tableName), |
| 174 | Value: sarama.ByteEncoder(bytes), |
| 175 | } |
| 176 | |
| 177 | if shardID >= 0 { |
| 178 | msg.Partition = shardID |
| 179 | } |
| 180 | |
| 181 | *msgs = append(*msgs, &msg) |
| 182 | } |
| 183 | |
| 184 | *rowsIgnored = *rowsIgnored + (len(rows) - numRows) |
| 185 | return |
| 186 | } |
no test coverage detected