MCPcopy Index your code
hub / github.com/mitmproxy/mitmproxy / flow_set

Method flow_set

mitmproxy/addons/core.py:132–182  ·  view source on GitHub ↗

Quickly set a number of common values on flows.

(self, flows: Sequence[flow.Flow], attr: str, value: str)

Source from the content-addressed store, hash-verified

130 @command.command("flow.set")
131 @command.argument("attr", type=mitmproxy.types.Choice("flow.set.options"))
132 def flow_set(self, flows: Sequence[flow.Flow], attr: str, value: str) -> None:
133 """
134 Quickly set a number of common values on flows.
135 """
136 val: int | str = value
137 if attr == "status_code":
138 try:
139 val = int(val) # type: ignore
140 except ValueError as v:
141 raise exceptions.CommandError(
142 "Status code is not an integer: %s" % val
143 ) from v
144
145 updated = []
146 for f in flows:
147 req = getattr(f, "request", None)
148 rupdate = True
149 if req:
150 if attr == "method":
151 req.method = val
152 elif attr == "host":
153 req.host = val
154 elif attr == "path":
155 req.path = val
156 elif attr == "url":
157 try:
158 req.url = val
159 except ValueError as e:
160 raise exceptions.CommandError(
161 f"URL {val!r} is invalid: {e}"
162 ) from e
163 else:
164 self.rupdate = False
165
166 resp = getattr(f, "response", None)
167 supdate = True
168 if resp:
169 if attr == "status_code":
170 resp.status_code = val
171 if val in status_codes.RESPONSES:
172 resp.reason = status_codes.RESPONSES[val] # type: ignore
173 elif attr == "reason":
174 resp.reason = val
175 else:
176 supdate = False
177
178 if rupdate or supdate:
179 updated.append(f)
180
181 ctx.master.addons.trigger(hooks.UpdateHook(updated))
182 logger.log(ALERT, f"Set {attr} on {len(updated)} flows.")
183
184 @command.command("flow.decode")
185 def decode(self, flows: Sequence[flow.Flow], part: str) -> None:

Callers 1

test_flow_setFunction · 0.95

Calls 3

triggerMethod · 0.80
appendMethod · 0.45
logMethod · 0.45

Tested by 1

test_flow_setFunction · 0.76