(t *testing.T)
| 536 | } |
| 537 | |
| 538 | func TestHandlerDownsideMqtt(t *testing.T) { |
| 539 | e, _, _ := genHandlerDownsideEngine(t) |
| 540 | h := &handlerDownside{engineImpl: e} |
| 541 | |
| 542 | // upside msg |
| 543 | ch, err := e.pb.Subscribe(sync.TopicUpside) |
| 544 | assert.NoError(t, err) |
| 545 | assert.NotNil(t, ch) |
| 546 | |
| 547 | handler := &msgRPCUpside{t: t} |
| 548 | pro := pubsub.NewProcessor(ch, 0, handler) |
| 549 | pro.Start() |
| 550 | defer pro.Close() |
| 551 | |
| 552 | // event msg |
| 553 | ch0, err := e.pb.Subscribe(eventx.TopicEvent) |
| 554 | assert.NoError(t, err) |
| 555 | assert.NotNil(t, ch) |
| 556 | |
| 557 | handler0 := &msgRPCMqtt{t: t} |
| 558 | pro0 := pubsub.NewProcessor(ch0, 0, handler0) |
| 559 | pro0.Start() |
| 560 | defer pro0.Close() |
| 561 | |
| 562 | // req0 rpc success |
| 563 | engMsgWG.Add(2) |
| 564 | req0 := &specV1.Message{ |
| 565 | Kind: specV1.MessageCMD, |
| 566 | Metadata: map[string]string{ |
| 567 | "cmd": specV1.MessageRPCMqtt, |
| 568 | }, |
| 569 | Content: specV1.LazyValue{ |
| 570 | Value: &specV1.RPCMqttMessage{ |
| 571 | Topic: "test/node", |
| 572 | QoS: 0, |
| 573 | Content: "result", |
| 574 | }, |
| 575 | }, |
| 576 | } |
| 577 | handler.check = func(msg interface{}) { |
| 578 | m, ok := msg.(*specV1.Message) |
| 579 | assert.True(t, ok) |
| 580 | assert.Equal(t, "true", m.Metadata["success"]) |
| 581 | engMsgWG.Done() |
| 582 | } |
| 583 | handler0.check = func(msg interface{}) { |
| 584 | m, ok := msg.(*specV1.Message) |
| 585 | assert.True(t, ok) |
| 586 | request := &specV1.RPCMqttMessage{} |
| 587 | handlerErr := m.Content.Unmarshal(request) |
| 588 | assert.NoError(t, handlerErr) |
| 589 | var buf []byte |
| 590 | if request.Content != nil { |
| 591 | buf = []byte(fmt.Sprintf("%v", request.Content)) |
| 592 | } |
| 593 | assert.Equal(t, string(buf), "result") |
| 594 | engMsgWG.Done() |
| 595 | } |
nothing calls this directly
no test coverage detected