* * Users should modify this function according to different demands. */
()
| 110 | * Users should modify this function according to different demands. |
| 111 | */ |
| 112 | func (er *ExampleReplayer) handler() { |
| 113 | for msg := range er.pendingQueue { |
| 114 | count := uint64(len(msg.message.RawLogs)) |
| 115 | if count == 0 { |
| 116 | // probe request |
| 117 | continue |
| 118 | } |
| 119 | |
| 120 | // parse batched message |
| 121 | oplogs := make([]oplog.ParsedLog, len(msg.message.RawLogs)) |
| 122 | for i, raw := range msg.message.RawLogs { |
| 123 | oplogs[i] = oplog.ParsedLog{} |
| 124 | if err := bson.Unmarshal(raw, &oplogs[i]); err != nil { |
| 125 | // impossible switch, need panic and exit |
| 126 | l.Logger.Panicf("unmarshal oplog[%v] failed[%v]", raw, err) |
| 127 | return |
| 128 | } |
| 129 | l.Logger.Infof("%v", oplogs[i]) // just print for test, users can modify to fulfill different needs |
| 130 | // fmt.Println(oplogs[i]) |
| 131 | } |
| 132 | |
| 133 | if callback := msg.completion; callback != nil { |
| 134 | callback() // exec callback |
| 135 | } |
| 136 | |
| 137 | // get the newest timestamp |
| 138 | n := len(oplogs) |
| 139 | lastTs := utils.TimeStampToInt64(oplogs[n-1].Timestamp) |
| 140 | er.Ack = lastTs |
| 141 | |
| 142 | l.Logger.Debugf("handle ack[%v]", er.Ack) |
| 143 | |
| 144 | // add logical code below |
| 145 | } |
| 146 | } |
no test coverage detected