ToByteArray produces a serialized UpsertBatch in byte array.
()
| 384 | |
| 385 | // ToByteArray produces a serialized UpsertBatch in byte array. |
| 386 | func (u UpsertBatchBuilder) ToByteArray() ([]byte, error) { |
| 387 | // Create buffer. |
| 388 | numCols := len(u.columns) |
| 389 | // initialized size to 4 bytes (version number). |
| 390 | versionHeaderSize := 4 |
| 391 | // 24 bytes consist of fixed headers: |
| 392 | // [int32] num_of_rows (4 bytes) |
| 393 | // [uint16] num_of_columns (2 bytes) |
| 394 | // <reserve 14 bytes> |
| 395 | // [uint32] arrival_time (4 bytes) |
| 396 | fixedHeaderSize := 24 |
| 397 | columnHeaderSize := ColumnHeaderSize(numCols) |
| 398 | headerSize := versionHeaderSize + fixedHeaderSize + columnHeaderSize |
| 399 | size := headerSize |
| 400 | for _, column := range u.columns { |
| 401 | column.CalculateBufferSize(&size) |
| 402 | } |
| 403 | size = utils.AlignOffset(size, 8) |
| 404 | buffer := make([]byte, size) |
| 405 | writer := utils.NewBufferWriter(buffer) |
| 406 | |
| 407 | // Write upsert batch version. |
| 408 | if err := writer.AppendUint32(uint32(V1)); err != nil { |
| 409 | return nil, utils.StackError(err, "Failed to write version number") |
| 410 | } |
| 411 | // Write fixed headers. |
| 412 | if err := writer.AppendInt32(int32(u.NumRows)); err != nil { |
| 413 | return nil, utils.StackError(err, "Failed to write number of rows") |
| 414 | } |
| 415 | if err := writer.AppendUint16(uint16(len(u.columns))); err != nil { |
| 416 | return nil, utils.StackError(err, "Failed to write number of columns") |
| 417 | } |
| 418 | writer.SkipBytes(14) |
| 419 | if err := writer.AppendUint32(uint32(utils.Now().Unix())); err != nil { |
| 420 | return nil, utils.StackError(err, "Failed to write arrival time") |
| 421 | } |
| 422 | columnHeader := NewUpsertBatchHeader(buffer[writer.GetOffset():headerSize], numCols) |
| 423 | // skip to data offset |
| 424 | writer.SkipBytes(columnHeaderSize) |
| 425 | |
| 426 | // Write per column data their headers. |
| 427 | for i, column := range u.columns { |
| 428 | if err := columnHeader.WriteColumnID(column.columnID, i); err != nil { |
| 429 | return nil, err |
| 430 | } |
| 431 | if err := columnHeader.WriteColumnFlag(column.GetMode(), column.updateMode, i); err != nil { |
| 432 | return nil, err |
| 433 | } |
| 434 | if err := columnHeader.WriteColumnType(column.dataType, i); err != nil { |
| 435 | return nil, err |
| 436 | } |
| 437 | if err := columnHeader.WriteColumnOffset(writer.GetOffset(), i); err != nil { |
| 438 | return nil, err |
| 439 | } |
| 440 | if err := column.AppendToBuffer(&writer); err != nil { |
| 441 | return nil, utils.StackError(err, "Failed to write data for column %d", i) |
| 442 | } |
| 443 | if err := columnHeader.WriteColumnOffset(writer.GetOffset(), i+1); err != nil { |