MCPcopy
hub / github.com/TaskingAI/TaskingAI / execute

Method execute

plugin/bundles/serper/plugins/text_search/plugin.py:10–38  ·  view source on GitHub ↗
(self, credentials: BundleCredentials, plugin_input: PluginInput)

Source from the content-addressed store, hash-verified

8
9class TextSearch(PluginHandler):
10 async def execute(self, credentials: BundleCredentials, plugin_input: PluginInput) -> PluginOutput:
11 query: str = plugin_input.input_params.get("query")
12 SERPER_API_KEY: str = credentials.credentials.get("SERPER_API_KEY")
13
14 url = "https://google.serper.dev/search"
15
16 headers = {"X-API-KEY": SERPER_API_KEY, "Content-Type": "application/json"}
17
18 payload = json.dumps({"q": query, "num": 10})
19
20 async with ClientSession() as session:
21 async with session.post(url, headers=headers, data=payload, proxy=CONFIG.PROXY) as response:
22 if response.status != 200:
23 raise_provider_api_error(await response.text())
24 data = await response.json()
25 result = {}
26 if "answerBox" in data:
27 result["answer_box"] = data["answerBox"]
28 if "organic" in data:
29 result["organic_search_results"] = []
30 for item in data["organic"]:
31 result["organic_search_results"].append(
32 {
33 "title": item.get("title", ""),
34 "link": item.get("link", ""),
35 "snippet": item.get("snippet", ""),
36 }
37 )
38 return PluginOutput(data={"result": json.dumps(result)})

Callers

nothing calls this directly

Calls 4

PluginOutputClass · 0.85
raise_provider_api_errorFunction · 0.50
getMethod · 0.45
jsonMethod · 0.45

Tested by

no test coverage detected