( ctx context.Context, rw io.ReadWriteCloser, connOptions *pogs.ConnectionOptions, tunnelConfigGetter TunnelConfigJSONGetter, )
| 76 | } |
| 77 | |
| 78 | func (c *controlStream) ServeControlStream( |
| 79 | ctx context.Context, |
| 80 | rw io.ReadWriteCloser, |
| 81 | connOptions *pogs.ConnectionOptions, |
| 82 | tunnelConfigGetter TunnelConfigJSONGetter, |
| 83 | ) error { |
| 84 | registrationClient := c.registerClientFunc(ctx, rw, c.registerTimeout) |
| 85 | c.observer.logConnecting(c.connIndex, c.edgeAddress, c.protocol) |
| 86 | registrationDetails, err := registrationClient.RegisterConnection( |
| 87 | ctx, |
| 88 | c.tunnelProperties.Credentials.Auth(), |
| 89 | c.tunnelProperties.Credentials.TunnelID, |
| 90 | connOptions, |
| 91 | c.connIndex, |
| 92 | c.edgeAddress) |
| 93 | if err != nil { |
| 94 | defer registrationClient.Close() |
| 95 | if err.Error() == DuplicateConnectionError { |
| 96 | c.observer.metrics.regFail.WithLabelValues("dup_edge_conn", "registerConnection").Inc() |
| 97 | return errDuplicationConnection |
| 98 | } |
| 99 | c.observer.metrics.regFail.WithLabelValues("server_error", "registerConnection").Inc() |
| 100 | return serverRegistrationErrorFromRPC(err) |
| 101 | } |
| 102 | c.observer.metrics.regSuccess.WithLabelValues("registerConnection").Inc() |
| 103 | |
| 104 | c.observer.logConnected(registrationDetails.UUID, c.connIndex, registrationDetails.Location, c.edgeAddress, c.protocol) |
| 105 | c.observer.sendConnectedEvent(c.connIndex, c.protocol, registrationDetails.Location, c.edgeAddress) |
| 106 | c.connectedFuse.Connected() |
| 107 | |
| 108 | // if conn index is 0 and tunnel is not remotely managed, then send local ingress rules configuration |
| 109 | if c.connIndex == 0 && !registrationDetails.TunnelIsRemotelyManaged { |
| 110 | if tunnelConfig, err := tunnelConfigGetter.GetConfigJSON(); err == nil { |
| 111 | if err := registrationClient.SendLocalConfiguration(ctx, tunnelConfig); err != nil { |
| 112 | c.observer.metrics.localConfigMetrics.pushesErrors.Inc() |
| 113 | c.observer.log.Err(err).Msg("unable to send local configuration") |
| 114 | } |
| 115 | c.observer.metrics.localConfigMetrics.pushes.Inc() |
| 116 | } else { |
| 117 | c.observer.log.Err(err).Msg("failed to obtain current configuration") |
| 118 | } |
| 119 | } |
| 120 | |
| 121 | return c.waitForUnregister(ctx, registrationClient) |
| 122 | } |
| 123 | |
| 124 | func (c *controlStream) waitForUnregister(ctx context.Context, registrationClient tunnelrpc.RegistrationClient) error { |
| 125 | // wait for connection termination or start of graceful shutdown |
nothing calls this directly
no test coverage detected