MCPcopy
hub / github.com/cloudflare/cloudflared / ServeControlStream

Method ServeControlStream

connection/control.go:78–122  ·  view source on GitHub ↗
(
	ctx context.Context,
	rw io.ReadWriteCloser,
	connOptions *pogs.ConnectionOptions,
	tunnelConfigGetter TunnelConfigJSONGetter,
)

Source from the content-addressed store, hash-verified

76}
77
78func (c *controlStream) ServeControlStream(
79 ctx context.Context,
80 rw io.ReadWriteCloser,
81 connOptions *pogs.ConnectionOptions,
82 tunnelConfigGetter TunnelConfigJSONGetter,
83) error {
84 registrationClient := c.registerClientFunc(ctx, rw, c.registerTimeout)
85 c.observer.logConnecting(c.connIndex, c.edgeAddress, c.protocol)
86 registrationDetails, err := registrationClient.RegisterConnection(
87 ctx,
88 c.tunnelProperties.Credentials.Auth(),
89 c.tunnelProperties.Credentials.TunnelID,
90 connOptions,
91 c.connIndex,
92 c.edgeAddress)
93 if err != nil {
94 defer registrationClient.Close()
95 if err.Error() == DuplicateConnectionError {
96 c.observer.metrics.regFail.WithLabelValues("dup_edge_conn", "registerConnection").Inc()
97 return errDuplicationConnection
98 }
99 c.observer.metrics.regFail.WithLabelValues("server_error", "registerConnection").Inc()
100 return serverRegistrationErrorFromRPC(err)
101 }
102 c.observer.metrics.regSuccess.WithLabelValues("registerConnection").Inc()
103
104 c.observer.logConnected(registrationDetails.UUID, c.connIndex, registrationDetails.Location, c.edgeAddress, c.protocol)
105 c.observer.sendConnectedEvent(c.connIndex, c.protocol, registrationDetails.Location, c.edgeAddress)
106 c.connectedFuse.Connected()
107
108 // if conn index is 0 and tunnel is not remotely managed, then send local ingress rules configuration
109 if c.connIndex == 0 && !registrationDetails.TunnelIsRemotelyManaged {
110 if tunnelConfig, err := tunnelConfigGetter.GetConfigJSON(); err == nil {
111 if err := registrationClient.SendLocalConfiguration(ctx, tunnelConfig); err != nil {
112 c.observer.metrics.localConfigMetrics.pushesErrors.Inc()
113 c.observer.log.Err(err).Msg("unable to send local configuration")
114 }
115 c.observer.metrics.localConfigMetrics.pushes.Inc()
116 } else {
117 c.observer.log.Err(err).Msg("failed to obtain current configuration")
118 }
119 }
120
121 return c.waitForUnregister(ctx, registrationClient)
122}
123
124func (c *controlStream) waitForUnregister(ctx context.Context, registrationClient tunnelrpc.RegistrationClient) error {
125 // wait for connection termination or start of graceful shutdown

Callers

nothing calls this directly

Calls 13

waitForUnregisterMethod · 0.95
logConnectingMethod · 0.80
logConnectedMethod · 0.80
sendConnectedEventMethod · 0.80
RegisterConnectionMethod · 0.65
CloseMethod · 0.65
ConnectedMethod · 0.65
GetConfigJSONMethod · 0.65
AuthMethod · 0.45
ErrorMethod · 0.45

Tested by

no test coverage detected