Gets the float value of metric. If tags is specified, searched for metric with matching tags. Returns -1 if the metric isn't available.
(
metric: str,
expected_tags: Optional[Dict[str, str]],
timeseries: Optional[PrometheusTimeseries] = None,
timeout: float = 20,
)
| 1549 | |
| 1550 | |
| 1551 | def get_metric_float( |
| 1552 | metric: str, |
| 1553 | expected_tags: Optional[Dict[str, str]], |
| 1554 | timeseries: Optional[PrometheusTimeseries] = None, |
| 1555 | timeout: float = 20, |
| 1556 | ) -> float: |
| 1557 | """Gets the float value of metric. |
| 1558 | |
| 1559 | If tags is specified, searched for metric with matching tags. |
| 1560 | |
| 1561 | Returns -1 if the metric isn't available. |
| 1562 | """ |
| 1563 | if timeseries is None: |
| 1564 | timeseries = PrometheusTimeseries() |
| 1565 | samples = fetch_prometheus_metric_timeseries( |
| 1566 | [f"localhost:{TEST_METRICS_EXPORT_PORT}"], |
| 1567 | timeseries, |
| 1568 | timeout=timeout, |
| 1569 | ).get(metric, []) |
| 1570 | for sample in samples: |
| 1571 | if expected_tags.items() <= sample.labels.items(): |
| 1572 | return sample.value |
| 1573 | return -1 |
| 1574 | |
| 1575 | |
| 1576 | def check_metric_float_eq( |
no test coverage detected
searching dependent graphs…