MCPcopy
hub / github.com/langbot-app/LangBot / execute

Method execute

src/langbot/pkg/command/cmdmgr.py:76–120  ·  view source on GitHub ↗

执行命令

(
        self,
        command_text: str,
        full_command_text: str,
        query: pipeline_query.Query,
        session: provider_session.Session,
    )

Source from the content-addressed store, hash-verified

74 yield command_context.CommandReturn(error=command_errors.CommandNotFoundError(context.command))
75
76 async def execute(
77 self,
78 command_text: str,
79 full_command_text: str,
80 query: pipeline_query.Query,
81 session: provider_session.Session,
82 ) -> typing.AsyncGenerator[command_context.CommandReturn, None]:
83 """执行命令"""
84
85 privilege = 1
86
87 import sqlalchemy as _sa
88 from ..entity.persistence.bot import BotAdmin as _BotAdmin
89
90 _admins = await self.ap.persistence_mgr.execute_async(
91 _sa.select(_BotAdmin).where(
92 _BotAdmin.bot_uuid == (query.bot_uuid or ''),
93 _BotAdmin.launcher_type == query.launcher_type.value,
94 _BotAdmin.launcher_id == str(query.launcher_id),
95 )
96 )
97 if _admins.first() is not None:
98 privilege = 2
99
100 ctx = command_context.ExecuteContext(
101 query_id=query.query_id,
102 session=session,
103 command_text=command_text,
104 full_command_text=full_command_text,
105 command='',
106 crt_command='',
107 params=command_text.split(' '),
108 crt_params=command_text.split(' '),
109 privilege=privilege,
110 )
111
112 ctx.command = ctx.params[0]
113
114 ctx.shift()
115
116 # Get bound plugins from query
117 bound_plugins = query.variables.get('_pipeline_bound_plugins', None)
118
119 async for ret in self._execute(ctx, self.cmd_list, bound_plugins=bound_plugins):
120 yield ret

Calls 4

_executeMethod · 0.95
strFunction · 0.85
execute_asyncMethod · 0.45
firstMethod · 0.45