MCPcopy
hub / github.com/SteamDeckHomebrew/decky-loader / __init__

Method __init__

backend/decky_loader/loader.py:70–102  ·  view source on GitHub ↗
(self, server_instance: PluginManager, ws: WSRouter, plugin_path: str, loop: AbstractEventLoop, live_reload: bool = False)

Source from the content-addressed store, hash-verified

68
69class Loader:
70 def __init__(self, server_instance: PluginManager, ws: WSRouter, plugin_path: str, loop: AbstractEventLoop, live_reload: bool = False) -> None:
71 self.loop = loop
72 self.logger = getLogger("Loader")
73 self.ws = ws
74 self.plugin_path = plugin_path
75 self.logger.info(f"plugin_path: {self.plugin_path}")
76 self.plugins: Plugins = {}
77 self.watcher = None
78 self.live_reload = live_reload
79 self.reload_queue: ReloadQueue = Queue()
80 self.loop.create_task(self.handle_reloads())
81 self.context: PluginManager = server_instance
82
83 if live_reload:
84 self.observer = Observer()
85 self.watcher = FileChangeHandler(self.reload_queue, plugin_path)
86 self.observer.schedule(self.watcher, self.plugin_path, recursive=True) # pyright: ignore [reportUnknownMemberType]
87 self.observer.start()
88 self.loop.create_task(self.enable_reload_wait())
89
90 server_instance.web_app.add_routes([
91 web.get("/frontend/{path:.*}", self.handle_frontend_assets),
92 web.get("/locales/{path:.*}", self.handle_frontend_locales),
93 web.get("/plugins/{plugin_name}/frontend_bundle", self.handle_frontend_bundle),
94 web.get("/plugins/{plugin_name}/dist/{path:.*}", self.handle_plugin_dist),
95 web.get("/plugins/{plugin_name}/assets/{path:.*}", self.handle_plugin_frontend_assets),
96 web.get("/plugins/{plugin_name}/data/{path:.*}", self.handle_plugin_frontend_assets_from_data),
97 ])
98
99 server_instance.ws.add_route("loader/get_plugins", self.get_plugins)
100 server_instance.ws.add_route("loader/reload_plugin", self.handle_plugin_backend_reload)
101 server_instance.ws.add_route("loader/call_plugin_method", self.handle_plugin_method_call)
102 server_instance.ws.add_route("loader/call_legacy_plugin_method", self.handle_plugin_method_call_legacy)
103
104 async def shutdown_plugins(self):
105 await gather(*[self.plugins[plugin_name].stop() for plugin_name in self.plugins])

Callers 1

__init__Method · 0.45

Calls 5

handle_reloadsMethod · 0.95
enable_reload_waitMethod · 0.95
FileChangeHandlerClass · 0.85
startMethod · 0.80
add_routeMethod · 0.80

Tested by

no test coverage detected