| 426 | |
| 427 | |
| 428 | class BoundedExecutor: |
| 429 | EXECUTOR_CLS = futures.ThreadPoolExecutor |
| 430 | |
| 431 | def __init__( |
| 432 | self, max_size, max_num_threads, tag_semaphores=None, executor_cls=None |
| 433 | ): |
| 434 | """An executor implementation that has a maximum queued up tasks |
| 435 | |
| 436 | The executor will block if the number of tasks that have been |
| 437 | submitted and is currently working on is past its maximum. |
| 438 | |
| 439 | :params max_size: The maximum number of inflight futures. An inflight |
| 440 | future means that the task is either queued up or is currently |
| 441 | being executed. A size of None or 0 means that the executor will |
| 442 | have no bound in terms of the number of inflight futures. |
| 443 | |
| 444 | :params max_num_threads: The maximum number of threads the executor |
| 445 | uses. |
| 446 | |
| 447 | :type tag_semaphores: dict |
| 448 | :params tag_semaphores: A dictionary where the key is the name of the |
| 449 | tag and the value is the semaphore to use when limiting the |
| 450 | number of tasks the executor is processing at a time. |
| 451 | |
| 452 | :type executor_cls: BaseExecutor |
| 453 | :param underlying_executor_cls: The executor class that |
| 454 | get bounded by this executor. If None is provided, the |
| 455 | concurrent.futures.ThreadPoolExecutor class is used. |
| 456 | """ |
| 457 | self._max_num_threads = max_num_threads |
| 458 | if executor_cls is None: |
| 459 | executor_cls = self.EXECUTOR_CLS |
| 460 | self._executor = executor_cls(max_workers=self._max_num_threads) |
| 461 | self._semaphore = TaskSemaphore(max_size) |
| 462 | self._tag_semaphores = tag_semaphores |
| 463 | |
| 464 | def submit(self, task, tag=None, block=True): |
| 465 | """Submit a task to complete |
| 466 | |
| 467 | :type task: s3transfer.tasks.Task |
| 468 | :param task: The task to run __call__ on |
| 469 | |
| 470 | |
| 471 | :type tag: s3transfer.futures.TaskTag |
| 472 | :param tag: An optional tag to associate to the task. This |
| 473 | is used to override which semaphore to use. |
| 474 | |
| 475 | :type block: boolean |
| 476 | :param block: True if to wait till it is possible to submit a task. |
| 477 | False, if not to wait and raise an error if not able to submit |
| 478 | a task. |
| 479 | |
| 480 | :returns: The future associated to the submitted task |
| 481 | """ |
| 482 | semaphore = self._semaphore |
| 483 | # If a tag was provided, use the semaphore associated to that |
| 484 | # tag. |
| 485 | if tag: |
no outgoing calls