(shutdown_only, use_actors)
| 422 | |
| 423 | @pytest.mark.parametrize("use_actors", [False, True]) |
| 424 | def test_map_operator_shutdown(shutdown_only, use_actors): |
| 425 | ray.shutdown() |
| 426 | ray.init(num_cpus=0, num_gpus=1) |
| 427 | |
| 428 | def _sleep(block_iter: Iterable[Block]) -> Iterable[Block]: |
| 429 | time.sleep(999) |
| 430 | |
| 431 | # Create with inputs. |
| 432 | input_op = InputDataBuffer( |
| 433 | DataContext.get_current(), make_ref_bundles([[i] for i in range(10)]) |
| 434 | ) |
| 435 | compute_strategy = ActorPoolStrategy(size=1) if use_actors else TaskPoolStrategy() |
| 436 | op = MapOperator.create( |
| 437 | create_map_transformer_from_block_fn(_sleep), |
| 438 | input_op=input_op, |
| 439 | data_context=DataContext.get_current(), |
| 440 | name="TestMapper", |
| 441 | compute_strategy=compute_strategy, |
| 442 | ray_remote_args={"num_cpus": 0, "num_gpus": 1}, |
| 443 | ) |
| 444 | |
| 445 | # Start one task and then cancel. |
| 446 | op.start(ExecutionOptions()) |
| 447 | if use_actors: |
| 448 | # Wait for the actor to start. |
| 449 | run_op_tasks_sync(op) |
| 450 | op.add_input(input_op.get_next(), 0) |
| 451 | assert op.num_active_tasks() == 1 |
| 452 | # Regular Ray tasks can be interrupted/cancelled, so graceful shutdown works. |
| 453 | # Actors running time.sleep() cannot be interrupted gracefully and need ray.kill() to release resources. |
| 454 | # After proper shutdown, both should return the GPU to ray.available_resources(). |
| 455 | force_shutdown = use_actors |
| 456 | op.shutdown(timer=Timer(), force=force_shutdown) |
| 457 | |
| 458 | # Tasks/actors should be cancelled/killed. |
| 459 | wait_for_condition(lambda: (ray.available_resources().get("GPU", 0) == 1.0)) |
| 460 | |
| 461 | |
| 462 | @pytest.mark.parametrize( |
nothing calls this directly
no test coverage detected
searching dependent graphs…