MCPcopy Index your code
hub / github.com/TaskingAI/TaskingAI / load_plugin_data

Function load_plugin_data

plugin/app/cache/plugin.py:26–78  ·  view source on GitHub ↗

Load plugin data from YAML files. :param bundle_ids: a list of bundle ids. :return: a dictionary of bundle ids and a list of plugin ids.

(bundle_ids: List[str])

Source from the content-addressed store, hash-verified

24
25
26def load_plugin_data(bundle_ids: List[str]) -> Dict[str, List[str]]:
27
28 """
29 Load plugin data from YAML files.
30 :param bundle_ids: a list of bundle ids.
31 :return: a dictionary of bundle ids and a list of plugin ids.
32 """
33 global __plugins, __bundle_plugin_list, __bundle_plugin_dict, __plugin_cache, __plugin_schema_checksum
34 bundle_plugin_ids = {}
35
36 bundles_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../../bundles")
37
38 # Iterate through each file in the directory
39 for bundle_id in bundle_ids:
40
41 plugin_dir_path = os.path.join(bundles_path, bundle_id, "plugins")
42 # List all entries in the plugins directory that are directories
43 plugin_ids = [i for i in os.listdir(plugin_dir_path) if os.path.isdir(os.path.join(plugin_dir_path, i))]
44 pattern = re.compile(r'^[a-z0-9][a-z0-9_]*$')
45 plugin_ids = [i for i in plugin_ids if pattern.match(i)]
46
47 __bundle_plugin_list[bundle_id] = []
48 __bundle_plugin_dict[bundle_id] = {}
49 bundle_plugin_ids[bundle_id] = []
50
51 for plugin_id in plugin_ids:
52 file_path = os.path.join(plugin_dir_path, plugin_id, "plugin_schema.yml")
53
54 # Check if file is not empty
55 if os.path.getsize(file_path) > 0:
56 try:
57
58 # Open and read the file
59 with open(file_path, "r") as file:
60 plugin_data = yaml.safe_load(file) # Use yaml.safe_load to load YAML data
61
62 # Process the data
63 plugin = Plugin.build(bundle_id, plugin_data)
64 __plugins.append(plugin)
65 __bundle_plugin_list[bundle_id].append(plugin)
66 __bundle_plugin_dict[bundle_id][plugin_id] = plugin
67 bundle_plugin_ids[bundle_id].append(plugin_id)
68 except yaml.YAMLError as e:
69 print(f"Error loading YAML from file {file_path}: {e}")
70 else:
71 print(f"Skipping empty file: {file_path}")
72
73 __bundle_plugin_list[bundle_id].sort(key=lambda x: x.plugin_id)
74 bundle_plugin_ids[bundle_id].sort()
75
76 __plugin_cache = [plugin.to_dict(lang=None) for plugin in __plugins]
77 __plugin_schema_checksum = checksum(__plugin_cache)
78 return bundle_plugin_ids
79
80
81def list_plugins(bundle_id: Optional[str]) -> List[Plugin]:

Callers 1

lifespanFunction · 0.90

Calls 3

checksumFunction · 0.90
buildMethod · 0.45
to_dictMethod · 0.45

Tested by

no test coverage detected