| 472 | |
| 473 | |
| 474 | class Http2Client(Http2Connection): |
| 475 | h2_conf = h2.config.H2Configuration( |
| 476 | **Http2Connection.h2_conf_defaults, |
| 477 | client_side=True, |
| 478 | ) |
| 479 | |
| 480 | ReceiveProtocolError = ResponseProtocolError |
| 481 | ReceiveData = ResponseData |
| 482 | ReceiveTrailers = ResponseTrailers |
| 483 | ReceiveEndOfMessage = ResponseEndOfMessage |
| 484 | |
| 485 | our_stream_id: dict[int, int] |
| 486 | their_stream_id: dict[int, int] |
| 487 | stream_queue: collections.defaultdict[int, list[Event]] |
| 488 | """Queue of streams that we haven't sent yet because we have reached MAX_CONCURRENT_STREAMS""" |
| 489 | provisional_max_concurrency: int | None = 10 |
| 490 | """A provisional currency limit before we get the server's first settings frame.""" |
| 491 | last_activity: float |
| 492 | """Timestamp of when we've last seen network activity on this connection.""" |
| 493 | |
| 494 | def __init__(self, context: Context): |
| 495 | super().__init__(context, context.server) |
| 496 | # Disable HTTP/2 push for now to keep things simple. |
| 497 | # don't send here, that is done as part of initiate_connection(). |
| 498 | self.h2_conn.local_settings.enable_push = 0 |
| 499 | # hyper-h2 pitfall: we need to acknowledge here, otherwise its sends out the old settings. |
| 500 | self.h2_conn.local_settings.acknowledge() |
| 501 | self.our_stream_id = {} |
| 502 | self.their_stream_id = {} |
| 503 | self.stream_queue = collections.defaultdict(list) |
| 504 | |
| 505 | def _handle_event(self, event: Event) -> CommandGenerator[None]: |
| 506 | # We can't reuse stream ids from the client because they may arrived reordered here |
| 507 | # and HTTP/2 forbids opening a stream on a lower id than what was previously sent (see test_stream_concurrency). |
| 508 | # To mitigate this, we transparently map the outside's stream id to our stream id. |
| 509 | if isinstance(event, HttpEvent): |
| 510 | ours = self.our_stream_id.get(event.stream_id, None) |
| 511 | if ours is None: |
| 512 | no_free_streams = self.h2_conn.open_outbound_streams >= ( |
| 513 | self.provisional_max_concurrency |
| 514 | or self.h2_conn.remote_settings.max_concurrent_streams |
| 515 | ) |
| 516 | if no_free_streams: |
| 517 | self.stream_queue[event.stream_id].append(event) |
| 518 | return |
| 519 | ours = self.h2_conn.get_next_available_stream_id() |
| 520 | self.our_stream_id[event.stream_id] = ours |
| 521 | self.their_stream_id[ours] = event.stream_id |
| 522 | event.stream_id = ours |
| 523 | |
| 524 | for cmd in self._handle_event2(event): |
| 525 | if isinstance(cmd, ReceiveHttp): |
| 526 | cmd.event.stream_id = self.their_stream_id[cmd.event.stream_id] |
| 527 | yield cmd |
| 528 | |
| 529 | can_resume_queue = self.stream_queue and self.h2_conn.open_outbound_streams < ( |
| 530 | self.provisional_max_concurrency |
| 531 | or self.h2_conn.remote_settings.max_concurrent_streams |
no outgoing calls
searching dependent graphs…