(
cmd: Sequence[str],
varargs: Sequence[str],
target_concurrency: int,
_max_length: int | None = None,
)
| 74 | |
| 75 | |
| 76 | def partition( |
| 77 | cmd: Sequence[str], |
| 78 | varargs: Sequence[str], |
| 79 | target_concurrency: int, |
| 80 | _max_length: int | None = None, |
| 81 | ) -> tuple[tuple[str, ...], ...]: |
| 82 | _max_length = _max_length or _get_platform_max_length() |
| 83 | |
| 84 | # Generally, we try to partition evenly into at least `target_concurrency` |
| 85 | # partitions, but we don't want a bunch of tiny partitions. |
| 86 | max_args = max(4, math.ceil(len(varargs) / target_concurrency)) |
| 87 | |
| 88 | cmd = tuple(cmd) |
| 89 | ret = [] |
| 90 | |
| 91 | ret_cmd: list[str] = [] |
| 92 | # Reversed so arguments are in order |
| 93 | varargs = list(reversed(varargs)) |
| 94 | |
| 95 | total_length = _command_length(*cmd) + 1 |
| 96 | while varargs: |
| 97 | arg = varargs.pop() |
| 98 | |
| 99 | arg_length = _command_length(arg) + 1 |
| 100 | if ( |
| 101 | total_length + arg_length <= _max_length and |
| 102 | len(ret_cmd) < max_args |
| 103 | ): |
| 104 | ret_cmd.append(arg) |
| 105 | total_length += arg_length |
| 106 | elif not ret_cmd: |
| 107 | raise ArgumentTooLongError(arg) |
| 108 | else: |
| 109 | # We've exceeded the length, yield a command |
| 110 | ret.append(cmd + tuple(ret_cmd)) |
| 111 | ret_cmd = [] |
| 112 | total_length = _command_length(*cmd) + 1 |
| 113 | varargs.append(arg) |
| 114 | |
| 115 | ret.append(cmd + tuple(ret_cmd)) |
| 116 | |
| 117 | return tuple(ret) |
| 118 | |
| 119 | |
| 120 | @contextlib.contextmanager |
no test coverage detected