MCPcopy Index your code
hub / github.com/TaskingAI/TaskingAI / sync_plugin_data

Function sync_plugin_data

backend/app/services/tool/plugin/cache.py:25–99  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

23
24
25async def sync_plugin_data():
26 logger.debug(f"sync_plugin_data started!")
27
28 global _bundle_checksum, _plugin_checksum, _i18n_checksum
29 async with aiohttp.ClientSession() as session:
30 response = await session.get(
31 f"{CONFIG.TASKINGAI_PLUGIN_URL}/v1/cache_checksums",
32 )
33 response_wrapper = ResponseWrapper(response.status, await response.json())
34 check_http_error(response_wrapper)
35 response_data = response_wrapper.json()["data"]
36 bundle_checksum = response_data["bundle_checksum"]
37 plugin_checksum = response_data["plugin_checksum"]
38 i18n_checksum = response_data["i18n_checksum"]
39 if (
40 bundle_checksum == _bundle_checksum
41 and plugin_checksum == _plugin_checksum
42 and i18n_checksum == _i18n_checksum
43 ):
44 logger.debug(f"Checksums are the same, no need to sync plugin data.")
45 return
46
47 async with aiohttp.ClientSession() as session:
48 response = await session.get(
49 f"{CONFIG.TASKINGAI_PLUGIN_URL}/v1/caches",
50 )
51 response_wrapper = ResponseWrapper(response.status, await response.json())
52 check_http_error(response_wrapper)
53 response_data = response_wrapper.json()["data"]
54
55 # sync i18n
56 global _i18n_dict
57 _i18n_dict = response_data["i18n"]
58
59 # sort plugins by bundle_id, name
60 plugins = [Plugin.build(plugin_data) for plugin_data in response_data["plugins"]]
61 plugins.sort(key=lambda x: (x.bundle_id, x.plugin_id))
62
63 num_plugins_dict = {}
64 for plugin in plugins:
65 num_plugins_dict[plugin.bundle_id] = num_plugins_dict.get(plugin.bundle_id, 0) + 1
66
67 # sort bundles by bundle_id
68 bundles = [Bundle.build(bundle_data) for bundle_data in response_data["bundles"]]
69 bundles.sort(key=lambda x: x.bundle_id)
70
71 for bundle in bundles:
72 bundle.num_plugins = num_plugins_dict.get(bundle.bundle_id, 0)
73
74 # sort bundle by name
75 bundle_dict = {bundle.bundle_id: bundle for bundle in bundles}
76
77 plugin_dict = {f"{plugin.bundle_id}:{plugin.plugin_id}": plugin for plugin in plugins}
78
79 bundle_plugin_dict = {plugin.bundle_id: [] for plugin in plugins}
80 for plugin in plugins:
81 bundle_plugin_dict[plugin.bundle_id].append(plugin)
82 for bundle_id in bundle_plugin_dict:

Callers 1

sync_dataFunction · 0.90

Calls 5

jsonMethod · 0.95
ResponseWrapperClass · 0.90
check_http_errorFunction · 0.90
getMethod · 0.45
buildMethod · 0.45

Tested by

no test coverage detected