MCPcopy
hub / github.com/dgraph-io/dgraph / Subscribe

Method Subscribe

graphql/admin/http.go:141–212  ·  view source on GitHub ↗
(
	ctx context.Context,
	document,
	operationName string,
	variableValues map[string]interface{})

Source from the content-addressed store, hash-verified

139}
140
141func (gs *graphqlSubscription) Subscribe(
142 ctx context.Context,
143 document,
144 operationName string,
145 variableValues map[string]interface{}) (<-chan interface{}, error) {
146
147 reqHeader := http.Header{}
148 // library (graphql-transport-ws) passes the headers which are part of the INIT payload to us
149 // in the context. We are extracting those headers and passing them along.
150 headerPayload, _ := ctx.Value("Header").(json.RawMessage)
151 if len(headerPayload) > 0 {
152 headers := make(map[string]interface{})
153 if err := json.Unmarshal(headerPayload, &headers); err != nil {
154 return nil, err
155 }
156
157 for k, v := range headers {
158 if vStr, ok := v.(string); ok {
159 reqHeader.Set(k, vStr)
160 }
161 }
162 }
163
164 // Earlier the graphql-transport-ws library was ignoring the http headers in the request.
165 // The library was relying upon the information present in the request payload. This was
166 // blocker for the cloud team because the only control cloud has is over the HTTP headers.
167 // This fix ensures that we are setting the request headers if not provided in the payload.
168 httpHeaders, _ := ctx.Value("RequestHeader").(http.Header)
169 if len(httpHeaders) > 0 {
170 for k := range httpHeaders {
171 if len(strings.TrimSpace(reqHeader.Get(k))) == 0 {
172 reqHeader.Set(k, httpHeaders.Get(k))
173 }
174 }
175 }
176
177 req := &schema.Request{
178 OperationName: operationName,
179 Query: document,
180 Variables: variableValues,
181 Header: reqHeader,
182 }
183
184 audit.AuditWebSockets(ctx, req)
185 namespace := x.ExtractNamespaceHTTP(&http.Request{Header: reqHeader})
186 glog.V(2).Infof("namespace: %d. Got GraphQL request over HTTP.", namespace)
187 // first load the schema, then do anything else
188 if err := LazyLoadSchema(namespace); err != nil {
189 return nil, err
190 }
191 if err := gs.isValid(namespace); err != nil {
192 glog.Errorf("namespace: %d. graphqlSubscription not initialized: %s", namespace, err)
193 return nil, errors.New(resolve.ErrInternal)
194 }
195
196 gs.graphqlHandler.pollerMux.RLock()
197 poller := gs.graphqlHandler.poller[namespace]
198 gs.graphqlHandler.pollerMux.RUnlock()

Callers

nothing calls this directly

Calls 14

isValidMethod · 0.95
AuditWebSocketsFunction · 0.92
ExtractNamespaceHTTPFunction · 0.92
LazyLoadSchemaFunction · 0.85
InfofMethod · 0.80
RLockMethod · 0.80
RUnlockMethod · 0.80
AddSubscriberMethod · 0.80
TerminateSubscriptionMethod · 0.80
SetMethod · 0.65
GetMethod · 0.65
ValueMethod · 0.45

Tested by

no test coverage detected