MCPcopy
hub / github.com/TaskingAI/TaskingAI / execute

Method execute

plugin/bundles/serper/plugins/maps_search/plugin.py:10–41  ·  view source on GitHub ↗
(self, credentials: BundleCredentials, plugin_input: PluginInput)

Source from the content-addressed store, hash-verified

8
9class MapsSearch(PluginHandler):
10 async def execute(self, credentials: BundleCredentials, plugin_input: PluginInput) -> PluginOutput:
11 query: str = plugin_input.input_params.get("query")
12 SERPER_API_KEY: str = credentials.credentials.get("SERPER_API_KEY")
13
14 url = "https://google.serper.dev/maps"
15
16 headers = {"X-API-KEY": SERPER_API_KEY, "Content-Type": "application/json"}
17
18 payload = json.dumps({"q": query, "num": 10})
19
20 async with ClientSession() as session:
21 async with session.post(url, headers=headers, data=payload, proxy=CONFIG.PROXY) as response:
22 if response.status != 200:
23 raise_provider_api_error(await response.text())
24 data = await response.json()
25 result = {}
26 if "places" in data:
27 result["organic_places_results"] = []
28 for item in data["places"]:
29 result["organic_places_results"].append(
30 {
31 "title": item.get("title", ""),
32 "address": item.get("address", ""),
33 "latitude": item.get("latitude", ""),
34 "longitude": item.get("longitude", ""),
35 "rating": item.get("rating", ""),
36 "types": item.get("types", []),
37 "website": item.get("website", ""),
38 "phoneNumber": item.get("phoneNumber", ""),
39 }
40 )
41 return PluginOutput(data={"result": json.dumps(result)})

Callers

nothing calls this directly

Calls 4

PluginOutputClass · 0.85
raise_provider_api_errorFunction · 0.50
getMethod · 0.45
jsonMethod · 0.45

Tested by

no test coverage detected