Invoke puts the request as []byte and call the dispatcher, and then return the response as []byte.
(ctx context.Context, req []byte)
| 38 | |
| 39 | // Invoke puts the request as []byte and call the dispatcher, and then return the response as []byte. |
| 40 | func (s *Protocol) Invoke(ctx context.Context, req []byte) (rsp []byte) { |
| 41 | defer CheckPanic() |
| 42 | reqPackage := requestf.RequestPacket{} |
| 43 | rspPackage := requestf.ResponsePacket{} |
| 44 | is := codec.NewReader(req[4:]) |
| 45 | reqPackage.ReadFrom(is) |
| 46 | |
| 47 | recvPkgTs, ok := current.GetRecvPkgTsFromContext(ctx) |
| 48 | if !ok { |
| 49 | recvPkgTs = time.Now().UnixNano() / 1e6 |
| 50 | } |
| 51 | |
| 52 | // timeout delivery |
| 53 | now := time.Now().UnixNano() / 1e6 |
| 54 | if reqPackage.ITimeout > 0 { |
| 55 | sub := now - recvPkgTs // coroutine scheduling time difference |
| 56 | timeout := int64(reqPackage.ITimeout) - sub |
| 57 | var cancel context.CancelFunc |
| 58 | ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)*time.Millisecond) |
| 59 | defer cancel() |
| 60 | } |
| 61 | |
| 62 | if reqPackage.HasMessageType(basef.TARSMESSAGETYPEDYED) { |
| 63 | if dyeingKey, ok := reqPackage.Status[current.StatusDyedKey]; ok { |
| 64 | if ok = current.SetDyeingKey(ctx, dyeingKey); !ok { |
| 65 | TLOG.Error("dyeing-debug: set dyeing key in current status error, dyeing key:", dyeingKey) |
| 66 | } |
| 67 | } |
| 68 | } |
| 69 | |
| 70 | // 处理TARS下的调用链追踪 |
| 71 | if reqPackage.HasMessageType(basef.TARSMESSAGETYPETRACE) { |
| 72 | if traceKey, ok := reqPackage.Status[current.StatusTraceKey]; ok { |
| 73 | TLOG.Info("[TARS] servant got a trace request, trace key:", traceKey) |
| 74 | if ok = current.InitTarsTrace(ctx, traceKey); !ok { |
| 75 | TLOG.Error("trace-debug: set trace key in current status error, trace key:", traceKey) |
| 76 | } |
| 77 | } |
| 78 | } |
| 79 | |
| 80 | if reqPackage.CPacketType == basef.TARSONEWAY { |
| 81 | defer func() { |
| 82 | endTime := time.Now().UnixNano() / 1e6 |
| 83 | ReportStatFromServer(reqPackage.SFuncName, "one_way_client", rspPackage.IRet, endTime-recvPkgTs) |
| 84 | }() |
| 85 | } else if reqPackage.CPacketType == basef.TARSNORMAL { |
| 86 | defer func() { |
| 87 | endTime := time.Now().UnixNano() / 1e6 |
| 88 | ReportStatFromServer(reqPackage.SFuncName, "stat_from_server", rspPackage.IRet, endTime-recvPkgTs) |
| 89 | }() |
| 90 | } |
| 91 | // timeout or tars_ping or error |
| 92 | rspPackage.IVersion = reqPackage.IVersion |
| 93 | rspPackage.IRequestId = reqPackage.IRequestId |
| 94 | |
| 95 | select { |
| 96 | case <-ctx.Done(): |
| 97 | rspPackage.IRet = basef.TARSSERVERQUEUETIMEOUT |
nothing calls this directly
no test coverage detected