Run a plugin :param bundle_instance_id: the bundle ID :param plugin_id: the action ID :param parameters: the parameters for the API call :return: the response of the API call
(
bundle_instance_id: str,
plugin_id: str,
parameters: Dict,
)
| 16 | |
| 17 | |
| 18 | async def run_plugin( |
| 19 | bundle_instance_id: str, |
| 20 | plugin_id: str, |
| 21 | parameters: Dict, |
| 22 | ) -> Dict: |
| 23 | """ |
| 24 | Run a plugin |
| 25 | :param bundle_instance_id: the bundle ID |
| 26 | :param plugin_id: the action ID |
| 27 | :param parameters: the parameters for the API call |
| 28 | :return: the response of the API call |
| 29 | """ |
| 30 | bundle_instance: BundleInstance = await bundle_instance_ops.get( |
| 31 | bundle_instance_id=bundle_instance_id, |
| 32 | ) |
| 33 | |
| 34 | try: |
| 35 | async with aiohttp.ClientSession() as session: |
| 36 | response = await session.post( |
| 37 | f"{CONFIG.TASKINGAI_PLUGIN_URL}/v1/execute", |
| 38 | json={ |
| 39 | "bundle_id": bundle_instance.bundle_id, |
| 40 | "plugin_id": plugin_id, |
| 41 | "input_params": parameters, |
| 42 | "encrypted_credentials": bundle_instance.encrypted_credentials, |
| 43 | "project_id": CONFIG.PROJECT_ID |
| 44 | }, |
| 45 | ) |
| 46 | |
| 47 | bytes_read = 0 |
| 48 | max_size = 64 * 1024 |
| 49 | data_chunks = [] |
| 50 | |
| 51 | # check the size of the response |
| 52 | async for chunk in response.content.iter_any(): |
| 53 | bytes_read += len(chunk) |
| 54 | if bytes_read > max_size: |
| 55 | raise ClientResponseError( |
| 56 | response.request_info, response.history, message="Response too large", status=response.status |
| 57 | ) |
| 58 | data_chunks.append(chunk) |
| 59 | |
| 60 | data_bytes = b"".join(data_chunks) |
| 61 | try: |
| 62 | # Assuming the response is JSON and decode here |
| 63 | data_dict = json.loads(data_bytes.decode("utf-8")) |
| 64 | except json.JSONDecodeError: |
| 65 | # Handle non-JSON response or decode error |
| 66 | return {"status": 500, "data": {"error": "Failed to decode the plugin response"}} |
| 67 | |
| 68 | response_wrapper = ResponseWrapper(response.status, data_dict) |
| 69 | |
| 70 | if response.status == 200: |
| 71 | data = response_wrapper.json().get("data") |
| 72 | return {"status": data["status"], "data": data["data"]} |
| 73 | |
| 74 | return {"status": response.status, "data": response_wrapper.json().get("error")} |
| 75 |
no test coverage detected