(self, event_loop, app_svc, contact_svc, file_svc, data_svc, learning_svc, fire_event_mock,
op_with_learning_and_seeded, make_test_link, make_test_result, knowledge_svc)
| 426 | assert len(test_link.facts) == 0 |
| 427 | |
| 428 | def test_facts(self, event_loop, app_svc, contact_svc, file_svc, data_svc, learning_svc, fire_event_mock, |
| 429 | op_with_learning_and_seeded, make_test_link, make_test_result, knowledge_svc): |
| 430 | event_loop.run_until_complete(data_svc.store(op_with_learning_and_seeded.source)) |
| 431 | test_link = make_test_link(9876) |
| 432 | op_with_learning_and_seeded.add_link(test_link) |
| 433 | |
| 434 | test_result = make_test_result(test_link.id) |
| 435 | event_loop.run_until_complete(data_svc.store(op_with_learning_and_seeded)) |
| 436 | event_loop.run_until_complete(op_with_learning_and_seeded._init_source()) # need to call this manually (no 'run') |
| 437 | event_loop.run_until_complete(contact_svc._save(test_result)) |
| 438 | assert len(test_link.facts) == 1 |
| 439 | fact = test_link.facts[0] |
| 440 | assert fact.trait == 'host.ip.address' |
| 441 | assert fact.value == '10.10.10.10' |
| 442 | |
| 443 | knowledge_data = event_loop.run_until_complete(op_with_learning_and_seeded.all_facts()) |
| 444 | assert len(knowledge_data) == 2 |
| 445 | origin_set = [x.source for x in knowledge_data] |
| 446 | assert op_with_learning_and_seeded.id in origin_set |
| 447 | assert op_with_learning_and_seeded.source.id in origin_set |
| 448 | |
| 449 | report = event_loop.run_until_complete(op_with_learning_and_seeded.report(file_svc, data_svc)) |
| 450 | assert len(report['facts']) == 2 |
| 451 | |
| 452 | async def test_wait_for_links_completion_ignorable_link(self, make_test_link, operation_agent): |
| 453 | test_agent = operation_agent |
nothing calls this directly
no test coverage detected