MCPcopy
hub / github.com/mochi-mqtt/server / attachClient

Method attachClient

server.go:406–495  ·  view source on GitHub ↗

attachClient validates an incoming client connection and if viable, attaches the client to the server, performs session housekeeping, and reads incoming packets.

(cl *Client, listener string)

Source from the content-addressed store, hash-verified

404// attachClient validates an incoming client connection and if viable, attaches the client
405// to the server, performs session housekeeping, and reads incoming packets.
406func (s *Server) attachClient(cl *Client, listener string) error {
407 defer s.Listeners.ClientsWg.Done()
408 s.Listeners.ClientsWg.Add(1)
409
410 go cl.WriteLoop()
411 defer cl.Stop(nil)
412
413 pk, err := s.readConnectionPacket(cl)
414 if err != nil {
415 return fmt.Errorf("read connection: %w", err)
416 }
417
418 cl.ParseConnect(listener, pk)
419 if atomic.LoadInt64(&s.Info.ClientsConnected) >= s.Options.Capabilities.MaximumClients {
420 if cl.Properties.ProtocolVersion < 5 {
421 s.SendConnack(cl, packets.ErrServerUnavailable, false, nil)
422 } else {
423 s.SendConnack(cl, packets.ErrServerBusy, false, nil)
424 }
425
426 return packets.ErrServerBusy
427 }
428
429 code := s.validateConnect(cl, pk) // [MQTT-3.1.4-1] [MQTT-3.1.4-2]
430 if code != packets.CodeSuccess {
431 if err := s.SendConnack(cl, code, false, nil); err != nil {
432 return fmt.Errorf("invalid connection send ack: %w", err)
433 }
434 return code // [MQTT-3.2.2-7] [MQTT-3.1.4-6]
435 }
436
437 err = s.hooks.OnConnect(cl, pk)
438 if err != nil {
439 return err
440 }
441
442 cl.refreshDeadline(cl.State.Keepalive)
443 if !s.hooks.OnConnectAuthenticate(cl, pk) { // [MQTT-3.1.4-2]
444 err := s.SendConnack(cl, packets.ErrBadUsernameOrPassword, false, nil)
445 if err != nil {
446 return fmt.Errorf("invalid connection send ack: %w", err)
447 }
448
449 return packets.ErrBadUsernameOrPassword
450 }
451
452 atomic.AddInt64(&s.Info.ClientsConnected, 1)
453 defer atomic.AddInt64(&s.Info.ClientsConnected, -1)
454
455 s.hooks.OnSessionEstablish(cl, pk)
456
457 sessionPresent := s.inheritClientSession(pk, cl)
458 s.Clients.Add(cl) // [MQTT-4.1.0-1]
459
460 err = s.SendConnack(cl, code, sessionPresent, nil) // [MQTT-3.1.4-5] [MQTT-3.2.0-1] [MQTT-3.2.0-2] &[MQTT-3.14.0-1]
461 if err != nil {
462 return fmt.Errorf("ack connection packet: %w", err)
463 }

Callers 1

EstablishConnectionMethod · 0.95

Calls 15

readConnectionPacketMethod · 0.95
SendConnackMethod · 0.95
validateConnectMethod · 0.95
inheritClientSessionMethod · 0.95
sendLWTMethod · 0.95
UnsubscribeClientMethod · 0.95
WriteLoopMethod · 0.80
ParseConnectMethod · 0.80
refreshDeadlineMethod · 0.80
IsTakenOverMethod · 0.80
ClearInflightsMethod · 0.80

Tested by

no test coverage detected