(linkId baseds.LinkId, client AbstractRpcClient)
| 530 | } |
| 531 | |
| 532 | func (router *WshRouter) runLinkClientRecvLoop(linkId baseds.LinkId, client AbstractRpcClient) { |
| 533 | defer func() { |
| 534 | panichandler.PanicHandler("WshRouter:runLinkClientRecvLoop", recover()) |
| 535 | }() |
| 536 | exitReason := "unknown" |
| 537 | lmForLog := router.getLinkMeta(linkId) |
| 538 | linkName := fmt.Sprintf("%d", linkId) |
| 539 | if lmForLog != nil { |
| 540 | linkName = lmForLog.Name() |
| 541 | } |
| 542 | log.Printf("link recvloop start for %s", linkName) |
| 543 | defer log.Printf("link recvloop done for %s (%s)", linkName, exitReason) |
| 544 | for { |
| 545 | msgBytes, ok := client.RecvRpcMessage() |
| 546 | if !ok { |
| 547 | exitReason = "recv-eof" |
| 548 | break |
| 549 | } |
| 550 | var rpcMsg RpcMessage |
| 551 | err := json.Unmarshal(msgBytes, &rpcMsg) |
| 552 | if err != nil { |
| 553 | continue |
| 554 | } |
| 555 | lm := router.getLinkMeta(linkId) |
| 556 | if lm == nil { |
| 557 | exitReason = "link-gone" |
| 558 | break |
| 559 | } |
| 560 | if rpcMsg.IsRpcRequest() { |
| 561 | if lm.sourceRouteId != "" { |
| 562 | rpcMsg.Source = lm.sourceRouteId |
| 563 | } |
| 564 | if rpcMsg.Route == "" { |
| 565 | rpcMsg.Route = DefaultRoute |
| 566 | } |
| 567 | msgBytes, err = json.Marshal(rpcMsg) |
| 568 | if err != nil { |
| 569 | continue |
| 570 | } |
| 571 | // allow control routes even for untrusted links (for authentication) |
| 572 | isControlRoute := rpcMsg.Route == ControlRoute || rpcMsg.Route == ControlRootRoute |
| 573 | if !lm.trusted { |
| 574 | if !isControlRoute { |
| 575 | sendControlUnauthenticatedErrorResponse(rpcMsg, *lm, router) |
| 576 | continue |
| 577 | } |
| 578 | log.Printf("wshrouter control-msg route=%s link=%s command=%s source=%s", rpcMsg.Route, lm.Name(), rpcMsg.Command, rpcMsg.Source) |
| 579 | } |
| 580 | } else { |
| 581 | // non-request messages (responses) |
| 582 | if !lm.trusted { |
| 583 | // allow responses to RPCs we initiated |
| 584 | if rpcMsg.ResId == "" || router.getRouteInfo(rpcMsg.ResId) == nil { |
| 585 | continue |
| 586 | } |
| 587 | } |
| 588 | } |
| 589 | router.inputCh <- baseds.RpcInputChType{MsgBytes: msgBytes, IngressLinkId: linkId} |
no test coverage detected