attachClient validates an incoming client connection and if viable, attaches the client to the server, performs session housekeeping, and reads incoming packets.
(cl *Client, listener string)
| 404 | // attachClient validates an incoming client connection and if viable, attaches the client |
| 405 | // to the server, performs session housekeeping, and reads incoming packets. |
| 406 | func (s *Server) attachClient(cl *Client, listener string) error { |
| 407 | defer s.Listeners.ClientsWg.Done() |
| 408 | s.Listeners.ClientsWg.Add(1) |
| 409 | |
| 410 | go cl.WriteLoop() |
| 411 | defer cl.Stop(nil) |
| 412 | |
| 413 | pk, err := s.readConnectionPacket(cl) |
| 414 | if err != nil { |
| 415 | return fmt.Errorf("read connection: %w", err) |
| 416 | } |
| 417 | |
| 418 | cl.ParseConnect(listener, pk) |
| 419 | if atomic.LoadInt64(&s.Info.ClientsConnected) >= s.Options.Capabilities.MaximumClients { |
| 420 | if cl.Properties.ProtocolVersion < 5 { |
| 421 | s.SendConnack(cl, packets.ErrServerUnavailable, false, nil) |
| 422 | } else { |
| 423 | s.SendConnack(cl, packets.ErrServerBusy, false, nil) |
| 424 | } |
| 425 | |
| 426 | return packets.ErrServerBusy |
| 427 | } |
| 428 | |
| 429 | code := s.validateConnect(cl, pk) // [MQTT-3.1.4-1] [MQTT-3.1.4-2] |
| 430 | if code != packets.CodeSuccess { |
| 431 | if err := s.SendConnack(cl, code, false, nil); err != nil { |
| 432 | return fmt.Errorf("invalid connection send ack: %w", err) |
| 433 | } |
| 434 | return code // [MQTT-3.2.2-7] [MQTT-3.1.4-6] |
| 435 | } |
| 436 | |
| 437 | err = s.hooks.OnConnect(cl, pk) |
| 438 | if err != nil { |
| 439 | return err |
| 440 | } |
| 441 | |
| 442 | cl.refreshDeadline(cl.State.Keepalive) |
| 443 | if !s.hooks.OnConnectAuthenticate(cl, pk) { // [MQTT-3.1.4-2] |
| 444 | err := s.SendConnack(cl, packets.ErrBadUsernameOrPassword, false, nil) |
| 445 | if err != nil { |
| 446 | return fmt.Errorf("invalid connection send ack: %w", err) |
| 447 | } |
| 448 | |
| 449 | return packets.ErrBadUsernameOrPassword |
| 450 | } |
| 451 | |
| 452 | atomic.AddInt64(&s.Info.ClientsConnected, 1) |
| 453 | defer atomic.AddInt64(&s.Info.ClientsConnected, -1) |
| 454 | |
| 455 | s.hooks.OnSessionEstablish(cl, pk) |
| 456 | |
| 457 | sessionPresent := s.inheritClientSession(pk, cl) |
| 458 | s.Clients.Add(cl) // [MQTT-4.1.0-1] |
| 459 | |
| 460 | err = s.SendConnack(cl, code, sessionPresent, nil) // [MQTT-3.1.4-5] [MQTT-3.2.0-1] [MQTT-3.2.0-2] &[MQTT-3.14.0-1] |
| 461 | if err != nil { |
| 462 | return fmt.Errorf("ack connection packet: %w", err) |
| 463 | } |
no test coverage detected