MCPcopy
hub / github.com/uber/aresdb / buildKafkaMessage

Method buildKafkaMessage

subscriber/common/sink/kafka.go:160–186  ·  view source on GitHub ↗
(msgs *[]*sarama.ProducerMessage, rowsIgnored *int, tableName string, shardID int32, columnNames []string, rows []client.Row,
	updateModes ...memCom.ColumnUpdateMode)

Source from the content-addressed store, hash-verified

158}
159
160func (kp *KafkaPublisher) buildKafkaMessage(msgs *[]*sarama.ProducerMessage, rowsIgnored *int, tableName string, shardID int32, columnNames []string, rows []client.Row,
161 updateModes ...memCom.ColumnUpdateMode) {
162 bytes, numRows, err := kp.UpsertBatchBuilder.PrepareUpsertBatch(tableName, columnNames, updateModes, rows)
163 if err != nil {
164 kp.Scope.Counter("errors.upsertBatchBuild").Inc(1)
165 kp.ServiceConfig.Logger.Error("Failed to prepare rows",
166 zap.String("table", tableName),
167 zap.Any("columns", columnNames),
168 zap.Error(err))
169 }
170
171 if len(bytes) > 0 {
172 msg := sarama.ProducerMessage{
173 Topic: fmt.Sprintf("ares-redolog-%s-%s", kp.Cluster(), tableName),
174 Value: sarama.ByteEncoder(bytes),
175 }
176
177 if shardID >= 0 {
178 msg.Partition = shardID
179 }
180
181 *msgs = append(*msgs, &msg)
182 }
183
184 *rowsIgnored = *rowsIgnored + (len(rows) - numRows)
185 return
186}

Callers 1

SaveMethod · 0.95

Calls 4

ClusterMethod · 0.95
PrepareUpsertBatchMethod · 0.65
ErrorMethod · 0.65
StringMethod · 0.65

Tested by

no test coverage detected