Run kicks off the function engine. Use the context to shut it down.
(ctx context.Context)
| 365 | |
| 366 | // Run kicks off the function engine. Use the context to shut it down. |
| 367 | func (obj *Lang) Run(ctx context.Context) (reterr error) { |
| 368 | defer obj.wg.Wait() |
| 369 | |
| 370 | ctx, cancel := context.WithCancel(ctx) // wrap parent |
| 371 | defer cancel() |
| 372 | |
| 373 | //<-obj.funcs.Started() // wait for startup (will not block forever) |
| 374 | |
| 375 | // Sanity checks for graph size. |
| 376 | //if count := obj.funcs.NumVertices(); count != 0 { |
| 377 | // return fmt.Errorf("expected empty graph on start, got %d vertices", count) |
| 378 | //} |
| 379 | //defer func() { |
| 380 | // if count := obj.funcs.NumVertices(); count != 0 { |
| 381 | // err := fmt.Errorf("expected empty graph on exit, got %d vertices", count) |
| 382 | // reterr = errwrap.Append(reterr, err) |
| 383 | // } |
| 384 | //}() |
| 385 | |
| 386 | txn := obj.funcs.Txn() |
| 387 | defer txn.Free() // remember to call Free() |
| 388 | txn.AddGraph(obj.graph) |
| 389 | if err := txn.Commit(); err != nil { |
| 390 | return errwrap.Wrapf(err, "error adding to function graph engine") |
| 391 | } |
| 392 | defer func() { |
| 393 | if err := txn.Reverse(); err != nil { // should remove everything we added |
| 394 | reterr = errwrap.Append(reterr, err) |
| 395 | } |
| 396 | }() |
| 397 | |
| 398 | //obj.Logf("function engine starting took: %s", time.Since(timing)) |
| 399 | // wait for some activity |
| 400 | obj.Logf("stream...") |
| 401 | |
| 402 | tableChan := obj.funcs.Stream() // after obj.funcs.Setup runs |
| 403 | |
| 404 | obj.wg.Add(1) |
| 405 | go func() { |
| 406 | defer obj.wg.Done() |
| 407 | defer close(obj.streamChan) |
| 408 | defer cancel() // if this loop errors, it should cancel and err |
| 409 | |
| 410 | var table interfaces.Table |
| 411 | var ok bool |
| 412 | for { |
| 413 | select { |
| 414 | case table, ok = <-tableChan: |
| 415 | if !ok { |
| 416 | return |
| 417 | } |
| 418 | |
| 419 | case <-ctx.Done(): |
| 420 | obj.errAppend(ctx.Err()) |
| 421 | return |
| 422 | } |
| 423 | |
| 424 | // this call returns the graph |