(self, spark, eval_function, space, trials)
| 284 | """ |
| 285 | |
| 286 | def __init__(self, spark, eval_function, space, trials): |
| 287 | self.spark = spark |
| 288 | self.eval_function = eval_function |
| 289 | self.space = space |
| 290 | self.trials = trials |
| 291 | self._fmin_done = False |
| 292 | self._dispatcher_thread = None |
| 293 | self._task_threads = set() |
| 294 | |
| 295 | if self.trials._spark_supports_job_cancelling: |
| 296 | spark_context = spark.sparkContext |
| 297 | self._job_group_id = spark_context.getLocalProperty("spark.jobGroup.id") |
| 298 | self._job_desc = spark_context.getLocalProperty("spark.job.description") |
| 299 | interrupt_on_cancel = spark_context.getLocalProperty( |
| 300 | "spark.job.interruptOnCancel" |
| 301 | ) |
| 302 | if interrupt_on_cancel is None: |
| 303 | self._job_interrupt_on_cancel = False |
| 304 | else: |
| 305 | self._job_interrupt_on_cancel = "true" == interrupt_on_cancel.lower() |
| 306 | # In certain Spark deployments, the local property "spark.jobGroup.id" |
| 307 | # value is None, so we create one to use for SparkTrials. |
| 308 | if self._job_group_id is None: |
| 309 | self._job_group_id = "Hyperopt_SparkTrials_" + _get_random_id() |
| 310 | if self._job_desc is None: |
| 311 | self._job_desc = "Trial evaluation jobs launched by hyperopt fmin" |
| 312 | logger.debug( |
| 313 | "Job group id: {g}, job desc: {d}, job interrupt on cancel: {i}".format( |
| 314 | g=self._job_group_id, |
| 315 | d=self._job_desc, |
| 316 | i=self._job_interrupt_on_cancel, |
| 317 | ) |
| 318 | ) |
| 319 | |
| 320 | def running_trial_count(self): |
| 321 | return self.trials.count_by_state_unsynced(base.JOB_STATE_RUNNING) |
nothing calls this directly
no test coverage detected