Kills all processes associated with worker node. Args: node: Worker node of which all associated processes will be removed.
(self, node, allow_graceful=True)
| 286 | return node |
| 287 | |
| 288 | def remove_node(self, node, allow_graceful=True): |
| 289 | """Kills all processes associated with worker node. |
| 290 | |
| 291 | Args: |
| 292 | node: Worker node of which all associated processes |
| 293 | will be removed. |
| 294 | """ |
| 295 | global_node = ray._private.worker.global_worker.node |
| 296 | if global_node is not None: |
| 297 | if node._raylet_socket_name == global_node._raylet_socket_name: |
| 298 | ray.shutdown() |
| 299 | raise ValueError( |
| 300 | "Removing a node that is connected to this Ray client " |
| 301 | "is not allowed because it will break the driver. " |
| 302 | "You can use the get_other_node utility to avoid removing " |
| 303 | "a node that the Ray client is connected." |
| 304 | ) |
| 305 | |
| 306 | node.destroy_external_storage() |
| 307 | if self.head_node == node: |
| 308 | # We have to wait to prevent the raylet becomes a zombie which will prevent |
| 309 | # worker from exiting |
| 310 | self.head_node.kill_all_processes( |
| 311 | check_alive=False, allow_graceful=allow_graceful, wait=True |
| 312 | ) |
| 313 | self.head_node = None |
| 314 | # TODO(rliaw): Do we need to kill all worker processes? |
| 315 | else: |
| 316 | # We have to wait to prevent the raylet becomes a zombie which will prevent |
| 317 | # worker from exiting |
| 318 | node.kill_all_processes( |
| 319 | check_alive=False, allow_graceful=allow_graceful, wait=True |
| 320 | ) |
| 321 | self.worker_nodes.remove(node) |
| 322 | |
| 323 | assert ( |
| 324 | not node.any_processes_alive() |
| 325 | ), "There are zombie processes left over after killing." |
| 326 | |
| 327 | def _wait_for_node(self, node, timeout: float = 30): |
| 328 | """Wait until this node has appeared in the client table. |