MCPcopy
hub / github.com/baetyl/baetyl / TestHandlerDownsideMqtt

Function TestHandlerDownsideMqtt

engine/msg_handler_test.go:538–599  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

536}
537
538func TestHandlerDownsideMqtt(t *testing.T) {
539 e, _, _ := genHandlerDownsideEngine(t)
540 h := &handlerDownside{engineImpl: e}
541
542 // upside msg
543 ch, err := e.pb.Subscribe(sync.TopicUpside)
544 assert.NoError(t, err)
545 assert.NotNil(t, ch)
546
547 handler := &msgRPCUpside{t: t}
548 pro := pubsub.NewProcessor(ch, 0, handler)
549 pro.Start()
550 defer pro.Close()
551
552 // event msg
553 ch0, err := e.pb.Subscribe(eventx.TopicEvent)
554 assert.NoError(t, err)
555 assert.NotNil(t, ch)
556
557 handler0 := &msgRPCMqtt{t: t}
558 pro0 := pubsub.NewProcessor(ch0, 0, handler0)
559 pro0.Start()
560 defer pro0.Close()
561
562 // req0 rpc success
563 engMsgWG.Add(2)
564 req0 := &specV1.Message{
565 Kind: specV1.MessageCMD,
566 Metadata: map[string]string{
567 "cmd": specV1.MessageRPCMqtt,
568 },
569 Content: specV1.LazyValue{
570 Value: &specV1.RPCMqttMessage{
571 Topic: "test/node",
572 QoS: 0,
573 Content: "result",
574 },
575 },
576 }
577 handler.check = func(msg interface{}) {
578 m, ok := msg.(*specV1.Message)
579 assert.True(t, ok)
580 assert.Equal(t, "true", m.Metadata["success"])
581 engMsgWG.Done()
582 }
583 handler0.check = func(msg interface{}) {
584 m, ok := msg.(*specV1.Message)
585 assert.True(t, ok)
586 request := &specV1.RPCMqttMessage{}
587 handlerErr := m.Content.Unmarshal(request)
588 assert.NoError(t, handlerErr)
589 var buf []byte
590 if request.Content != nil {
591 buf = []byte(fmt.Sprintf("%v", request.Content))
592 }
593 assert.Equal(t, string(buf), "result")
594 engMsgWG.Done()
595 }

Callers

nothing calls this directly

Calls 5

OnMessageMethod · 0.95
genHandlerDownsideEngineFunction · 0.85
StartMethod · 0.65
CloseMethod · 0.65
SubscribeMethod · 0.45

Tested by

no test coverage detected