| 278 | } |
| 279 | |
| 280 | func (m *Manager) dispatchInvocation(parentCtx context.Context, fn *Function) { |
| 281 | if fn == nil { |
| 282 | return |
| 283 | } |
| 284 | if m.isStopping() { |
| 285 | m.respf(fn, 503, "functions manager is stopping") |
| 286 | return |
| 287 | } |
| 288 | |
| 289 | handler, scheduleKey, ok := m.lookupFunctionRoute(*fn) |
| 290 | if !ok { |
| 291 | m.Infof("skipping execution of '%s': unregistered function", fn.Name) |
| 292 | m.respf(fn, 501, "unregistered function: %s", fn.Name) |
| 293 | return |
| 294 | } |
| 295 | if handler == nil { |
| 296 | m.Warningf("skipping execution of '%s': nil function registered", fn.Name) |
| 297 | m.respf(fn, 501, "nil function: %s", fn.Name) |
| 298 | return |
| 299 | } |
| 300 | |
| 301 | reqCtx, cancel := context.WithCancel(parentCtx) |
| 302 | switch m.trySetInvocationState(fn.UID, stateQueued, cancel, scheduleKey) { |
| 303 | case invocationAdmissionAccepted: |
| 304 | // admitted |
| 305 | case invocationAdmissionDuplicateActive: |
| 306 | cancel() |
| 307 | // Do not emit terminal output for duplicates of an active UID. Emitting |
| 308 | // via tryFinalize would mutate active tracking for the original invocation. |
| 309 | m.Warningf("ignoring duplicate active transaction id: %s", fn.UID) |
| 310 | m.observeDuplicateUIDIgnored() |
| 311 | return |
| 312 | case invocationAdmissionDuplicateTombstone: |
| 313 | cancel() |
| 314 | m.Warningf("ignoring duplicate recently finalized transaction id: %s", fn.UID) |
| 315 | m.observeDuplicateUIDIgnored() |
| 316 | return |
| 317 | case invocationAdmissionInvalid: |
| 318 | cancel() |
| 319 | m.Warningf("ignoring invalid transaction id: %q", fn.UID) |
| 320 | return |
| 321 | default: |
| 322 | cancel() |
| 323 | m.Warningf("ignoring transaction id '%s': unsupported admission state", fn.UID) |
| 324 | return |
| 325 | } |
| 326 | |
| 327 | req := &invocationRequest{ |
| 328 | fn: fn, |
| 329 | handler: handler, |
| 330 | ctx: reqCtx, |
| 331 | scheduleKey: scheduleKey, |
| 332 | } |
| 333 | |
| 334 | if m.scheduler == nil { |
| 335 | cancel() |
| 336 | m.respf(fn, 503, "functions manager is stopping") |
| 337 | return |