Create a flow for testing.
(
*,
client_conn: connection.Client | None = None,
server_conn: connection.Server | None = None,
req: http.Request | None = None,
resp: bool | http.Response = False,
err: bool | flow.Error = False,
ws: bool | websocket.WebSocketData = False,
live: bool = True,
)
| 158 | |
| 159 | |
| 160 | def tflow( |
| 161 | *, |
| 162 | client_conn: connection.Client | None = None, |
| 163 | server_conn: connection.Server | None = None, |
| 164 | req: http.Request | None = None, |
| 165 | resp: bool | http.Response = False, |
| 166 | err: bool | flow.Error = False, |
| 167 | ws: bool | websocket.WebSocketData = False, |
| 168 | live: bool = True, |
| 169 | ) -> http.HTTPFlow: |
| 170 | """Create a flow for testing.""" |
| 171 | if client_conn is None: |
| 172 | client_conn = tclient_conn() |
| 173 | if server_conn is None: |
| 174 | server_conn = tserver_conn() |
| 175 | if req is None: |
| 176 | req = treq() |
| 177 | |
| 178 | if resp is True: |
| 179 | resp = tresp() |
| 180 | if err is True: |
| 181 | err = terr() |
| 182 | if ws is True: |
| 183 | ws = twebsocket() |
| 184 | |
| 185 | assert resp is False or isinstance(resp, http.Response) |
| 186 | assert err is False or isinstance(err, flow.Error) |
| 187 | assert ws is False or isinstance(ws, websocket.WebSocketData) |
| 188 | |
| 189 | f = http.HTTPFlow(client_conn, server_conn) |
| 190 | f.timestamp_created = req.timestamp_start |
| 191 | f.request = req |
| 192 | f.response = resp or None |
| 193 | f.error = err or None |
| 194 | f.websocket = ws or None |
| 195 | f.live = live |
| 196 | return f |
| 197 | |
| 198 | |
| 199 | class DummyFlow(flow.Flow): |
searching dependent graphs…