(self, paw, ability_id, obfuscator, facts=())
| 247 | return dict(link=link.unique) |
| 248 | |
| 249 | async def task_agent_with_ability(self, paw, ability_id, obfuscator, facts=()): |
| 250 | new_links = [] |
| 251 | for agent in await self.get_service('data_svc').locate('agents', dict(paw=paw)): |
| 252 | self.log.debug('Tasking %s with %s' % (paw, ability_id)) |
| 253 | links = await agent.task( |
| 254 | abilities=await self.get_service('data_svc').locate('abilities', match=dict(ability_id=ability_id)), |
| 255 | obfuscator=obfuscator, |
| 256 | facts=facts |
| 257 | ) |
| 258 | new_links.extend(links) |
| 259 | return new_links |
| 260 | |
| 261 | async def get_link_pin(self, json_data): |
| 262 | link = await self.get_service('app_svc').find_link(json_data['link']) |
no test coverage detected