Initialization function for each worker process. Assigns a specific GPU to the process and loads the tokenizer.
(rank_queue, tokenizer_path)
| 185 | |
| 186 | |
| 187 | def process_init(rank_queue, tokenizer_path): |
| 188 | """ |
| 189 | Initialization function for each worker process. |
| 190 | Assigns a specific GPU to the process and loads the tokenizer. |
| 191 | """ |
| 192 | global worker_tokenizer, worker_feature_extractor |
| 193 | |
| 194 | # Configure worker process logging |
| 195 | formatter = ( |
| 196 | "%(asctime)s %(levelname)s [%(filename)s:%(lineno)d]" |
| 197 | " [Worker %(process)d] %(message)s" |
| 198 | ) |
| 199 | logging.basicConfig(format=formatter, level=logging.INFO, force=True) |
| 200 | |
| 201 | # Get assigned GPU rank |
| 202 | rank = rank_queue.get() |
| 203 | # Determine device |
| 204 | if rank != -1 and torch.cuda.is_available(): |
| 205 | worker_device = torch.device(f"cuda:{rank}") |
| 206 | else: |
| 207 | worker_device = torch.device("cpu") |
| 208 | |
| 209 | logging.debug(f"Worker process initialized with device: {worker_device}") |
| 210 | # Load tokenizer onto the specified device |
| 211 | worker_feature_extractor = AutoFeatureExtractor.from_pretrained(tokenizer_path) |
| 212 | worker_tokenizer = HiggsAudioV2TokenizerModel.from_pretrained( |
| 213 | tokenizer_path, device_map=worker_device |
| 214 | ) |
| 215 | logging.debug(f"Tokenizer loaded successfully on device {worker_device}") |
| 216 | |
| 217 | |
| 218 | def process_single_sample(sample: dict[str, Any]) -> dict[str, Any]: |
nothing calls this directly
no test coverage detected