MCPcopy
hub / github.com/NVIDIA/TensorRT-LLM / IpcMemory

Class IpcMemory

tensorrt_llm/_ipc_utils.py:68–149  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

66
67
68class IpcMemory:
69 # WARNING: Must in sync with FLAGS_SIZE in cpp/include/tensorrt_llm/runtime/ipcUtils.h
70 # (Max all reduce blocks + 1) * sizeof(int)
71 IPC_BARRIERS_SIZE_PER_GPU = (24 + 1) * 4
72
73 def __init__(self, mapping: Mapping, size: int, open_ipc: bool = True):
74 self.mapping = mapping
75 self.open_ipc = open_ipc and mapping.tp_size <= mapping.gpus_per_node
76 self.peer_ptrs = [0] * mapping.tp_size
77 self.local_ptr = 0
78
79 if self.open_ipc:
80 self.peer_ptrs, self.local_ptr = IpcMemory.open_ipc_memory(self.mapping, size, True)
81
82 def __del__(self):
83 if not sys.is_finalizing() and self.open_ipc:
84 IpcMemory.close_ipc_memory(self.mapping, self.peer_ptrs)
85
86 def serialize(self) -> List[int]:
87 buffer = bytes(0)
88 for ptr in self.peer_ptrs:
89 buffer += struct.pack("P", ptr)
90
91 return array.array("Q", buffer).tolist()
92
93 @staticmethod
94 def open_ipc_memory(
95 mapping: Mapping, size: int, set_to_zero: bool = False
96 ) -> Tuple[List[int], int]:
97 """Allocates a buffer with the given *size* on each GPU. Then, enables IPC communication between TP groups.
98 Returns a list of buffer pointers, buffers[i] is a handle to the corresponding buffer residing on GPU #i.
99 Call close_ipc_handle with the *buffer*.
100 """
101
102 def align_size(size, alignment):
103 if (size % alignment) != 0:
104 size += alignment - (size % alignment)
105 return size
106
107 from tensorrt_llm._torch.distributed.communicator import Distributed
108
109 dist = Distributed.get(mapping)
110
111 # see allocateIpcMemory in cpp/tensorrt_llm/runtime/ipcUtils.cpp for alignment reason
112 # 1 << 21 is 2MB
113 aligned_size = align_size(size, 1 << 21)
114 error, local_ptr = cudart.cudaMalloc(aligned_size)
115 _raise_if_error(error)
116 if set_to_zero:
117 _raise_if_error(cudart.cudaMemset(local_ptr, 0, aligned_size)[0])
118 error, local_handle = cudart.cudaIpcGetMemHandle(local_ptr)
119 _raise_if_error(error)
120 handles_reserved = dist.tp_allgather(local_handle.reserved)
121
122 handles = []
123 for reserved in handles_reserved:
124 handle = cudart.cudaIpcMemHandle_t()
125 handle.reserved = reserved

Calls

no outgoing calls

Tested by

no test coverage detected