MCPcopy
hub / github.com/mitmproxy/mitmproxy / reply

Class reply

test/mitmproxy/proxy/tutils.py:307–359  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

305
306
307class reply(events.Event):
308 args: tuple[Any, ...]
309 to: commands.Command | type[commands.Command] | int
310 side_effect: Callable[[Any], Any]
311
312 def __init__(
313 self,
314 *args,
315 to: commands.Command | type[commands.Command] | int = -1,
316 side_effect: Callable[[Any], None] = lambda x: None,
317 ):
318 """Utility method to reply to the latest hook in playbooks."""
319 assert not args or not isinstance(args[0], commands.Command)
320 self.args = args
321 self.to = to
322 self.side_effect = side_effect
323
324 def playbook_eval(self, playbook: Playbook) -> events.CommandCompleted:
325 if isinstance(self.to, int):
326 expected = playbook.expected[: playbook.expected.index(self)]
327 assert abs(self.to) < len(expected)
328 to = expected[self.to]
329 if not isinstance(to, commands.Command):
330 raise AssertionError(f"There is no command at offset {self.to}: {to}")
331 else:
332 self.to = to
333 elif isinstance(self.to, type):
334 for cmd in reversed(playbook.actual):
335 if isinstance(cmd, self.to):
336 assert isinstance(cmd, commands.Command)
337 self.to = cmd
338 break
339 else:
340 raise AssertionError(f"There is no command of type {self.to}.")
341 for cmd in reversed(playbook.actual):
342 if eq(self.to, cmd):
343 self.to = cmd
344 break
345 else:
346 raise AssertionError(f"Expected command {self.to} did not occur.")
347
348 assert isinstance(self.to, commands.Command)
349 if isinstance(self.to, commands.StartHook):
350 self.side_effect(*self.to.args())
351 reply_cls = events.HookCompleted
352 else:
353 self.side_effect(self.to)
354 reply_cls = command_reply_subclasses[type(self.to)]
355 try:
356 inst = reply_cls(self.to, *self.args)
357 except TypeError as e:
358 raise ValueError(f"Cannot instantiate {reply_cls.__name__}: {e}")
359 return inst
360
361
362T = TypeVar("T")

Callers 15

test_upgradeFunction · 0.90
test_upgrade_streamedFunction · 0.90
test_modify_messageFunction · 0.90
test_empty_messageFunction · 0.90
test_drop_messageFunction · 0.90
test_fragmentedFunction · 0.90
test_unfragmentedFunction · 0.90
test_protocol_errorFunction · 0.90

Calls

no outgoing calls

Tested by 15

test_upgradeFunction · 0.72
test_upgrade_streamedFunction · 0.72
test_modify_messageFunction · 0.72
test_empty_messageFunction · 0.72
test_drop_messageFunction · 0.72
test_fragmentedFunction · 0.72
test_unfragmentedFunction · 0.72
test_protocol_errorFunction · 0.72

Used in the wild real call sites across dependent graphs

searching dependent graphs…