()
| 425 | } |
| 426 | |
| 427 | func (c *rawConnection) dispatcherLoop() (err error) { |
| 428 | defer close(c.dispatcherLoopStopped) |
| 429 | var msg proto.Message |
| 430 | state := stateInitial |
| 431 | for { |
| 432 | select { |
| 433 | case <-c.closed: |
| 434 | return ErrClosed |
| 435 | default: |
| 436 | } |
| 437 | select { |
| 438 | case msg = <-c.inbox: |
| 439 | case <-c.closed: |
| 440 | return ErrClosed |
| 441 | } |
| 442 | |
| 443 | metricDeviceRecvMessages.WithLabelValues(c.idString).Inc() |
| 444 | |
| 445 | msgContext, err := messageContext(msg) |
| 446 | if err != nil { |
| 447 | return fmt.Errorf("protocol error: %w", err) |
| 448 | } |
| 449 | l.Debugf("handle %v message", msgContext) |
| 450 | |
| 451 | switch msg := msg.(type) { |
| 452 | case *bep.ClusterConfig: |
| 453 | if state == stateInitial { |
| 454 | state = stateReady |
| 455 | } |
| 456 | case *bep.Close: |
| 457 | return fmt.Errorf("closed by remote: %v", msg.Reason) |
| 458 | default: |
| 459 | if state != stateReady { |
| 460 | return newProtocolError(fmt.Errorf("invalid state %d", state), msgContext) |
| 461 | } |
| 462 | } |
| 463 | |
| 464 | switch msg := msg.(type) { |
| 465 | case *bep.ClusterConfig: |
| 466 | err = c.model.ClusterConfig(clusterConfigFromWire(msg)) |
| 467 | |
| 468 | case *bep.Index: |
| 469 | idx := indexFromWire(msg) |
| 470 | if err := checkIndexConsistency(idx.Files); err != nil { |
| 471 | return newProtocolError(err, msgContext) |
| 472 | } |
| 473 | err = c.handleIndex(idx) |
| 474 | |
| 475 | case *bep.IndexUpdate: |
| 476 | idxUp := indexUpdateFromWire(msg) |
| 477 | if err := checkIndexConsistency(idxUp.Files); err != nil { |
| 478 | return newProtocolError(err, msgContext) |
| 479 | } |
| 480 | err = c.handleIndexUpdate(idxUp) |
| 481 | |
| 482 | case *bep.Request: |
| 483 | if err := checkFilename(msg.Name); err != nil { |
| 484 | return newProtocolError(err, msgContext) |
no test coverage detected