MCPcopy Index your code
hub / github.com/prometheus/client_python / text_fd_to_metric_families

Function text_fd_to_metric_families

prometheus_client/parser.py:275–370  ·  view source on GitHub ↗

Parse Prometheus text format from a file descriptor. This is a laxer parser than the main Go parser, so successful parsing does not imply that the parsed text meets the specification. Yields Metric's.

(fd: TextIO)

Source from the content-addressed store, hash-verified

273
274
275def text_fd_to_metric_families(fd: TextIO) -> Iterable[Metric]:
276 """Parse Prometheus text format from a file descriptor.
277
278 This is a laxer parser than the main Go parser,
279 so successful parsing does not imply that the parsed
280 text meets the specification.
281
282 Yields Metric's.
283 """
284 name = ''
285 documentation = ''
286 typ = 'untyped'
287 samples: List[Sample] = []
288 allowed_names = []
289
290 def build_metric(name: str, documentation: str, typ: str, samples: List[Sample]) -> Metric:
291 # Munge counters into OpenMetrics representation
292 # used internally.
293 if typ == 'counter':
294 if name.endswith('_total'):
295 name = name[:-6]
296 else:
297 new_samples = []
298 for s in samples:
299 new_samples.append(Sample(s[0] + '_total', *s[1:]))
300 samples = new_samples
301 metric = Metric(name, documentation, typ)
302 metric.samples = samples
303 return metric
304
305 for line in fd:
306 line = line.strip()
307
308 if line.startswith('#'):
309 parts = _split_quoted(line, None, 3)
310 if len(parts) < 2:
311 continue
312 candidate_name, quoted = '', False
313 if len(parts) > 2:
314 # Ignore comment tokens
315 if parts[1] != 'TYPE' and parts[1] != 'HELP':
316 continue
317 candidate_name, quoted = _unquote_unescape(parts[2])
318 if not quoted and not _is_valid_legacy_metric_name(candidate_name):
319 raise ValueError
320 if parts[1] == 'HELP':
321 if candidate_name != name:
322 if name != '':
323 yield build_metric(name, documentation, typ, samples)
324 # New metric
325 name = candidate_name
326 typ = 'untyped'
327 samples = []
328 allowed_names = [candidate_name]
329 if len(parts) == 4:
330 documentation = _replace_help_escaping(parts[3])
331 else:
332 documentation = ''

Callers 1

Calls 7

_split_quotedFunction · 0.85
_unquote_unescapeFunction · 0.85
_replace_help_escapingFunction · 0.85
build_metricFunction · 0.70
_parse_sampleFunction · 0.70
getMethod · 0.45

Tested by

no test coverage detected