| 66 | |
| 67 | |
| 68 | class IpcMemory: |
| 69 | # WARNING: Must in sync with FLAGS_SIZE in cpp/include/tensorrt_llm/runtime/ipcUtils.h |
| 70 | # (Max all reduce blocks + 1) * sizeof(int) |
| 71 | IPC_BARRIERS_SIZE_PER_GPU = (24 + 1) * 4 |
| 72 | |
| 73 | def __init__(self, mapping: Mapping, size: int, open_ipc: bool = True): |
| 74 | self.mapping = mapping |
| 75 | self.open_ipc = open_ipc and mapping.tp_size <= mapping.gpus_per_node |
| 76 | self.peer_ptrs = [0] * mapping.tp_size |
| 77 | self.local_ptr = 0 |
| 78 | |
| 79 | if self.open_ipc: |
| 80 | self.peer_ptrs, self.local_ptr = IpcMemory.open_ipc_memory(self.mapping, size, True) |
| 81 | |
| 82 | def __del__(self): |
| 83 | if not sys.is_finalizing() and self.open_ipc: |
| 84 | IpcMemory.close_ipc_memory(self.mapping, self.peer_ptrs) |
| 85 | |
| 86 | def serialize(self) -> List[int]: |
| 87 | buffer = bytes(0) |
| 88 | for ptr in self.peer_ptrs: |
| 89 | buffer += struct.pack("P", ptr) |
| 90 | |
| 91 | return array.array("Q", buffer).tolist() |
| 92 | |
| 93 | @staticmethod |
| 94 | def open_ipc_memory( |
| 95 | mapping: Mapping, size: int, set_to_zero: bool = False |
| 96 | ) -> Tuple[List[int], int]: |
| 97 | """Allocates a buffer with the given *size* on each GPU. Then, enables IPC communication between TP groups. |
| 98 | Returns a list of buffer pointers, buffers[i] is a handle to the corresponding buffer residing on GPU #i. |
| 99 | Call close_ipc_handle with the *buffer*. |
| 100 | """ |
| 101 | |
| 102 | def align_size(size, alignment): |
| 103 | if (size % alignment) != 0: |
| 104 | size += alignment - (size % alignment) |
| 105 | return size |
| 106 | |
| 107 | from tensorrt_llm._torch.distributed.communicator import Distributed |
| 108 | |
| 109 | dist = Distributed.get(mapping) |
| 110 | |
| 111 | # see allocateIpcMemory in cpp/tensorrt_llm/runtime/ipcUtils.cpp for alignment reason |
| 112 | # 1 << 21 is 2MB |
| 113 | aligned_size = align_size(size, 1 << 21) |
| 114 | error, local_ptr = cudart.cudaMalloc(aligned_size) |
| 115 | _raise_if_error(error) |
| 116 | if set_to_zero: |
| 117 | _raise_if_error(cudart.cudaMemset(local_ptr, 0, aligned_size)[0]) |
| 118 | error, local_handle = cudart.cudaIpcGetMemHandle(local_ptr) |
| 119 | _raise_if_error(error) |
| 120 | handles_reserved = dist.tp_allgather(local_handle.reserved) |
| 121 | |
| 122 | handles = [] |
| 123 | for reserved in handles_reserved: |
| 124 | handle = cudart.cudaIpcMemHandle_t() |
| 125 | handle.reserved = reserved |
no outgoing calls
no test coverage detected