Send message to kvserver to initialize new data tensor and mapping this data from server side to client side. Parameters ---------- name : str data name shape : list or tuple of int data shape dtype : dtype data typ
(
self, name, shape, dtype, part_policy, init_func, is_gdata=True
)
| 1153 | self.barrier() |
| 1154 | |
| 1155 | def init_data( |
| 1156 | self, name, shape, dtype, part_policy, init_func, is_gdata=True |
| 1157 | ): |
| 1158 | """Send message to kvserver to initialize new data tensor and mapping this |
| 1159 | data from server side to client side. |
| 1160 | |
| 1161 | Parameters |
| 1162 | ---------- |
| 1163 | name : str |
| 1164 | data name |
| 1165 | shape : list or tuple of int |
| 1166 | data shape |
| 1167 | dtype : dtype |
| 1168 | data type |
| 1169 | part_policy : PartitionPolicy |
| 1170 | partition policy. |
| 1171 | init_func : func |
| 1172 | UDF init function |
| 1173 | is_gdata : bool |
| 1174 | Whether the created tensor is a ndata/edata or not. |
| 1175 | """ |
| 1176 | assert len(name) > 0, "name cannot be empty." |
| 1177 | assert len(shape) > 0, "shape cannot be empty" |
| 1178 | assert name not in self._data_name_list, ( |
| 1179 | "data name: %s already exists." % name |
| 1180 | ) |
| 1181 | self.barrier() |
| 1182 | shape = list(shape) |
| 1183 | |
| 1184 | # Send request to the servers to initialize data. |
| 1185 | # The servers may handle the duplicated initializations. |
| 1186 | part_shape = shape.copy() |
| 1187 | part_shape[0] = part_policy.get_part_size() |
| 1188 | request = InitDataRequest( |
| 1189 | name, |
| 1190 | tuple(part_shape), |
| 1191 | F.reverse_data_type_dict[dtype], |
| 1192 | part_policy.policy_str, |
| 1193 | init_func, |
| 1194 | ) |
| 1195 | # The request is sent to the servers in one group, which are on the same machine. |
| 1196 | for n in range(self._group_count): |
| 1197 | server_id = part_policy.part_id * self._group_count + n |
| 1198 | rpc.send_request(server_id, request) |
| 1199 | for _ in range(self._group_count): |
| 1200 | response = rpc.recv_response() |
| 1201 | assert response.msg == INIT_MSG |
| 1202 | |
| 1203 | self.barrier() |
| 1204 | # Create local shared-data |
| 1205 | local_shape = shape.copy() |
| 1206 | local_shape[0] = part_policy.get_part_size() |
| 1207 | if name in self._part_policy: |
| 1208 | raise RuntimeError("Policy %s has already exists!" % name) |
| 1209 | if name in self._data_store: |
| 1210 | raise RuntimeError("Data %s has already exists!" % name) |
| 1211 | if name in self._full_data_shape: |
| 1212 | raise RuntimeError("Data shape %s has already exists!" % name) |