mergeDelta updates e.store with facts from e.deltaStore. For facts with custom lattice join operations, replaces facts instead of adding.
()
| 417 | // mergeDelta updates e.store with facts from e.deltaStore. |
| 418 | // For facts with custom lattice join operations, replaces facts instead of adding. |
| 419 | func (e *engine) mergeDelta() error { |
| 420 | err := factstore.GetAllFacts(e.deltaStore, func(fact ast.Atom) error { |
| 421 | pred := fact.Predicate |
| 422 | fundep, mergePred, ok := e.hasMergePredicate(pred) |
| 423 | if !ok { |
| 424 | // Default case: just add the new fact. |
| 425 | e.store.Add(fact) |
| 426 | return nil |
| 427 | } |
| 428 | |
| 429 | // Merge-predicate case: add fact or replace existing fact. |
| 430 | // TODO: Generalize to merge predicate with n * 3 columns. |
| 431 | if len(fundep.Target) != 1 { |
| 432 | return fmt.Errorf("merging with |target vars| != 1 not implemented: %v", fundep.Target) |
| 433 | } |
| 434 | targetColumn := fundep.Target[0] |
| 435 | |
| 436 | // Query existing facts whose columns agree on fundep.Source values. |
| 437 | queryArgs := make([]ast.BaseTerm, pred.Arity, pred.Arity) |
| 438 | for i := 0; i < pred.Arity; i++ { |
| 439 | queryArgs[i] = ast.Variable{"_"} |
| 440 | } |
| 441 | for i := range fundep.Source { |
| 442 | queryArgs[i] = fact.Args[i] |
| 443 | } |
| 444 | queryExisting := ast.Atom{pred, queryArgs} |
| 445 | existing := false |
| 446 | e.store.GetFacts(queryExisting, func(existingFact ast.Atom) error { |
| 447 | existing = true |
| 448 | if fact.Equals(existingFact) { |
| 449 | return nil // nothing to do. |
| 450 | } |
| 451 | |
| 452 | // Evaluate merge predicate (top-down) to construct replacement fact. |
| 453 | merged := false |
| 454 | |
| 455 | // Prepare top-down query with merge predicate. |
| 456 | mergeQuery := ast.Atom{Predicate: mergePred, Args: []ast.BaseTerm{ |
| 457 | existingFact.Args[targetColumn], |
| 458 | |
| 459 | fact.Args[targetColumn], |
| 460 | ast.Variable{"_"}, |
| 461 | }} |
| 462 | err := e.newContext().EvalQuery(mergeQuery, mergePredMode, unionfind.New(), func(mergeFact ast.Atom) error { |
| 463 | merged = true |
| 464 | value := mergeFact.Args[2] |
| 465 | fact.Args[fundep.Target[0]] = value |
| 466 | if !existingFact.Equals(fact) { |
| 467 | if storeWithRemove, ok := e.store.(factstore.FactStoreWithRemove); ok { |
| 468 | storeWithRemove.Remove(existingFact) |
| 469 | } |
| 470 | e.store.Add(fact) |
| 471 | return errBreak |
| 472 | } |
| 473 | return nil |
| 474 | }) |
| 475 | if err == errBreak { |
| 476 | return nil // Already added. |
no test coverage detected