(self, event_loop, ability, executor, operation, data_svc, knowledge_svc, fire_event_mock)
| 139 | assert len(knowledge_base_r) == 1 |
| 140 | |
| 141 | def test_create_relationship_source_fact(self, event_loop, ability, executor, operation, data_svc, knowledge_svc, fire_event_mock): |
| 142 | test_executor = executor(name='psh', platform='windows') |
| 143 | test_ability = ability(ability_id='123', executors=[test_executor]) |
| 144 | fact1 = Fact(trait='remote.host.fqdn', value='dc') |
| 145 | fact2 = Fact(trait='domain.user.name', value='Bob') |
| 146 | relationship = Relationship(source=fact1, edge='has_admin', target=fact2) |
| 147 | link1 = Link(command='echo "Bob"', paw='123456', ability=test_ability, id='111111', executor=test_executor) |
| 148 | operation = operation(name='test-op', agents=[], |
| 149 | adversary=Adversary(name='sample', adversary_id='XYZ', atomic_ordering=[], |
| 150 | description='test'), |
| 151 | source=Source(id='test-source', facts=[fact1])) |
| 152 | event_loop.run_until_complete(data_svc.store(operation.source)) |
| 153 | event_loop.run_until_complete(operation._init_source()) |
| 154 | event_loop.run_until_complete(link1.create_relationships([relationship], operation)) |
| 155 | |
| 156 | link2 = Link(command='echo "Bob"', paw='789100', ability=test_ability, id='222222', executor=test_executor) |
| 157 | event_loop.run_until_complete(link2.create_relationships([relationship], operation)) |
| 158 | |
| 159 | fact_store_operation_source = event_loop.run_until_complete(knowledge_svc.get_facts(dict(source=operation.source.id))) |
| 160 | fact_store_operation = event_loop.run_until_complete(knowledge_svc.get_facts(dict(source=operation.id))) |
| 161 | assert len(fact_store_operation_source) == 1 |
| 162 | assert len(fact_store_operation) == 1 |
| 163 | assert len(fact_store_operation_source[0].collected_by) == 2 |
| 164 | |
| 165 | def test_save_discover_seeded_fact_not_in_command(self, event_loop, ability, executor, operation, knowledge_svc, data_svc, fire_event_mock): |
| 166 | test_executor = executor(name='psh', platform='windows') |
nothing calls this directly
no test coverage detected