(tests: Any)
| 50 | |
| 51 | |
| 52 | def decode_tests(tests: Any) -> list: |
| 53 | if not tests: |
| 54 | return [] |
| 55 | if isinstance(tests, list): |
| 56 | return tests |
| 57 | if isinstance(tests, str): |
| 58 | try: |
| 59 | # First, try to decode as a plain JSON string |
| 60 | return json.loads(tests) |
| 61 | except json.JSONDecodeError: |
| 62 | # If that fails, try to decode from the compressed format |
| 63 | try: |
| 64 | b64_decoded = base64.b64decode(tests.encode("utf-8")) |
| 65 | |
| 66 | # Use a streaming decompressor to handle potentially very large test cases |
| 67 | # without allocating a massive buffer upfront. This is more memory-efficient. |
| 68 | decompressor = zlib.decompressobj() |
| 69 | decompressed_chunks = [] |
| 70 | total_decompressed_size = 0 |
| 71 | |
| 72 | # Process in chunks to avoid holding the entire decompressed data in memory at once. |
| 73 | chunk_size = 256 * 1024 # 256KB chunks |
| 74 | for i in range(0, len(b64_decoded), chunk_size): |
| 75 | chunk = b64_decoded[i : i + chunk_size] |
| 76 | decompressed_chunk = decompressor.decompress(chunk) |
| 77 | total_decompressed_size += len(decompressed_chunk) |
| 78 | decompressed_chunks.append(decompressed_chunk) |
| 79 | |
| 80 | decompressed_chunks.append(decompressor.flush()) |
| 81 | |
| 82 | decompressed_data = b"".join(decompressed_chunks) |
| 83 | return pickle.loads(decompressed_data) |
| 84 | except Exception: |
| 85 | # Log the problematic data before returning an empty list |
| 86 | logger.error(f"Failed to decode test data: {tests}") |
| 87 | return [] |
| 88 | return [] |
| 89 | |
| 90 | |
| 91 | # ------------------------------------------------------------- |
no outgoing calls