Normalize the contents of a tar archive. We want tar archives to be as deterministic as possible. This function will take tar archive data in a buffer and return a new buffer containing a more deterministic tar archive.
(data: io.BytesIO)
| 403 | |
| 404 | |
| 405 | def normalize_tar_archive(data: io.BytesIO) -> io.BytesIO: |
| 406 | """Normalize the contents of a tar archive. |
| 407 | |
| 408 | We want tar archives to be as deterministic as possible. This function will |
| 409 | take tar archive data in a buffer and return a new buffer containing a more |
| 410 | deterministic tar archive. |
| 411 | """ |
| 412 | members = [] |
| 413 | |
| 414 | with tarfile.open(fileobj=data) as tf: |
| 415 | for ti in tf: |
| 416 | # We don't care about directory entries. Tools can handle this fine. |
| 417 | if ti.isdir(): |
| 418 | continue |
| 419 | |
| 420 | filedata = tf.extractfile(ti) |
| 421 | if filedata is not None: |
| 422 | filedata = io.BytesIO(filedata.read()) |
| 423 | |
| 424 | members.append((ti, filedata)) |
| 425 | |
| 426 | # Sort the archive members. We put PYTHON.json first so metadata can |
| 427 | # be read without reading the entire archive. |
| 428 | def sort_key(v): |
| 429 | if v[0].name == "python/PYTHON.json": |
| 430 | return 0, v[0].name |
| 431 | else: |
| 432 | return 1, v[0].name |
| 433 | |
| 434 | members.sort(key=sort_key) |
| 435 | |
| 436 | # Normalize attributes on archive members. |
| 437 | for entry in members: |
| 438 | ti = entry[0] |
| 439 | |
| 440 | # The pax headers attribute takes priority over the other named |
| 441 | # attributes. To minimize potential for our assigns to no-op, we |
| 442 | # clear out the pax headers. We can't reset all the pax headers, |
| 443 | # as this would nullify symlinks. |
| 444 | for a in ("mtime", "uid", "uname", "gid", "gname"): |
| 445 | try: |
| 446 | ti.pax_headers.__delattr__(a) |
| 447 | except AttributeError: |
| 448 | pass |
| 449 | |
| 450 | ti.pax_headers = {} |
| 451 | |
| 452 | ti.mtime = DEFAULT_MTIME |
| 453 | ti.uid = 0 |
| 454 | ti.uname = "root" |
| 455 | ti.gid = 0 |
| 456 | ti.gname = "root" |
| 457 | |
| 458 | # Give user/group read/write on all entries. |
| 459 | ti.mode |= stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP |
| 460 | |
| 461 | # If user executable, give to group as well. |
| 462 | if ti.mode & stat.S_IXUSR: |
no outgoing calls
no test coverage detected