| 62 | self.detached = detached |
| 63 | |
| 64 | def get_placement_groups(self, strategy="STRICT_PACK", name=None): |
| 65 | if self.pgs is not None: |
| 66 | return self.pgs |
| 67 | |
| 68 | pg_name_prefix = name if name else \ |
| 69 | f"{self.name_prefix}verl_group_{'_'.join([str(count) for count in self._store])}:" |
| 70 | # print(f"pg_name_prefix = {pg_name_prefix}") |
| 71 | pg_scheme = [[{ |
| 72 | "CPU": self.max_collocate_count, |
| 73 | "GPU": 1 |
| 74 | } if self.use_gpu else { |
| 75 | "CPU": self.max_collocate_count |
| 76 | } for _ in range(process_count)] for process_count in self._store] |
| 77 | |
| 78 | lifetime = 'detached' if self.detached else None |
| 79 | |
| 80 | pgs = [ |
| 81 | placement_group(bundles=bundles, strategy=strategy, name=pg_name_prefix + str(idx), lifetime=lifetime) |
| 82 | for idx, bundles in enumerate(pg_scheme) |
| 83 | ] |
| 84 | |
| 85 | ray.get([pg.ready() for pg in pgs]) |
| 86 | |
| 87 | self.pgs = pgs |
| 88 | return pgs |
| 89 | |
| 90 | |
| 91 | def extract_pg_from_exist(resource_pools: Dict[str, RayResourcePool], src_role_names: List[str], |