MCPcopy Index your code
hub / github.com/dmlc/dgl / init_data

Method init_data

python/dgl/distributed/kvstore.py:1155–1247  ·  view source on GitHub ↗

Send message to kvserver to initialize new data tensor and mapping this data from server side to client side. Parameters ---------- name : str data name shape : list or tuple of int data shape dtype : dtype data typ

(
        self, name, shape, dtype, part_policy, init_func, is_gdata=True
    )

Source from the content-addressed store, hash-verified

1153 self.barrier()
1154
1155 def init_data(
1156 self, name, shape, dtype, part_policy, init_func, is_gdata=True
1157 ):
1158 """Send message to kvserver to initialize new data tensor and mapping this
1159 data from server side to client side.
1160
1161 Parameters
1162 ----------
1163 name : str
1164 data name
1165 shape : list or tuple of int
1166 data shape
1167 dtype : dtype
1168 data type
1169 part_policy : PartitionPolicy
1170 partition policy.
1171 init_func : func
1172 UDF init function
1173 is_gdata : bool
1174 Whether the created tensor is a ndata/edata or not.
1175 """
1176 assert len(name) > 0, "name cannot be empty."
1177 assert len(shape) > 0, "shape cannot be empty"
1178 assert name not in self._data_name_list, (
1179 "data name: %s already exists." % name
1180 )
1181 self.barrier()
1182 shape = list(shape)
1183
1184 # Send request to the servers to initialize data.
1185 # The servers may handle the duplicated initializations.
1186 part_shape = shape.copy()
1187 part_shape[0] = part_policy.get_part_size()
1188 request = InitDataRequest(
1189 name,
1190 tuple(part_shape),
1191 F.reverse_data_type_dict[dtype],
1192 part_policy.policy_str,
1193 init_func,
1194 )
1195 # The request is sent to the servers in one group, which are on the same machine.
1196 for n in range(self._group_count):
1197 server_id = part_policy.part_id * self._group_count + n
1198 rpc.send_request(server_id, request)
1199 for _ in range(self._group_count):
1200 response = rpc.recv_response()
1201 assert response.msg == INIT_MSG
1202
1203 self.barrier()
1204 # Create local shared-data
1205 local_shape = shape.copy()
1206 local_shape[0] = part_policy.get_part_size()
1207 if name in self._part_policy:
1208 raise RuntimeError("Policy %s has already exists!" % name)
1209 if name in self._data_store:
1210 raise RuntimeError("Data %s has already exists!" % name)
1211 if name in self._full_data_shape:
1212 raise RuntimeError("Data shape %s has already exists!" % name)

Callers 1

start_clientFunction · 0.95

Calls 6

barrierMethod · 0.95
InitDataRequestClass · 0.85
empty_shared_memFunction · 0.85
get_part_sizeMethod · 0.80
to_dlpackMethod · 0.80

Tested by 1

start_clientFunction · 0.76