MCPcopy
hub / github.com/alibaba/MongoShake / handler

Method handler

receiver/replayer.go:112–146  ·  view source on GitHub ↗

* * Users should modify this function according to different demands. */

()

Source from the content-addressed store, hash-verified

110 * Users should modify this function according to different demands.
111 */
112func (er *ExampleReplayer) handler() {
113 for msg := range er.pendingQueue {
114 count := uint64(len(msg.message.RawLogs))
115 if count == 0 {
116 // probe request
117 continue
118 }
119
120 // parse batched message
121 oplogs := make([]oplog.ParsedLog, len(msg.message.RawLogs))
122 for i, raw := range msg.message.RawLogs {
123 oplogs[i] = oplog.ParsedLog{}
124 if err := bson.Unmarshal(raw, &oplogs[i]); err != nil {
125 // impossible switch, need panic and exit
126 l.Logger.Panicf("unmarshal oplog[%v] failed[%v]", raw, err)
127 return
128 }
129 l.Logger.Infof("%v", oplogs[i]) // just print for test, users can modify to fulfill different needs
130 // fmt.Println(oplogs[i])
131 }
132
133 if callback := msg.completion; callback != nil {
134 callback() // exec callback
135 }
136
137 // get the newest timestamp
138 n := len(oplogs)
139 lastTs := utils.TimeStampToInt64(oplogs[n-1].Timestamp)
140 er.Ack = lastTs
141
142 l.Logger.Debugf("handle ack[%v]", er.Ack)
143
144 // add logical code below
145 }
146}

Callers 1

NewExampleReplayerFunction · 0.95

Calls 3

PanicfMethod · 0.80
InfofMethod · 0.80
DebugfMethod · 0.80

Tested by

no test coverage detected