Returns a MmapedValue class based on a process_identifier function. The 'process_identifier' function MUST comply with this simple rule: when called in simultaneously running processes it MUST return distinct values. Using a different function than the default 'os.getpid' is at your ow
(process_identifier=os.getpid)
| 37 | |
| 38 | |
| 39 | def MultiProcessValue(process_identifier=os.getpid): |
| 40 | """Returns a MmapedValue class based on a process_identifier function. |
| 41 | |
| 42 | The 'process_identifier' function MUST comply with this simple rule: |
| 43 | when called in simultaneously running processes it MUST return distinct values. |
| 44 | |
| 45 | Using a different function than the default 'os.getpid' is at your own risk. |
| 46 | """ |
| 47 | files = {} |
| 48 | values = [] |
| 49 | pid = {'value': process_identifier()} |
| 50 | # Use a single global lock when in multi-processing mode |
| 51 | # as we presume this means there is no threading going on. |
| 52 | # This avoids the need to also have mutexes in __MmapDict. |
| 53 | lock = Lock() |
| 54 | |
| 55 | class MmapedValue: |
| 56 | """A float protected by a mutex backed by a per-process mmaped file.""" |
| 57 | |
| 58 | _multiprocess = True |
| 59 | |
| 60 | def __init__(self, typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode='', **kwargs): |
| 61 | self._params = typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode |
| 62 | # This deprecation warning can go away in a few releases when removing the compatibility |
| 63 | if 'prometheus_multiproc_dir' in os.environ and 'PROMETHEUS_MULTIPROC_DIR' not in os.environ: |
| 64 | os.environ['PROMETHEUS_MULTIPROC_DIR'] = os.environ['prometheus_multiproc_dir'] |
| 65 | warnings.warn("prometheus_multiproc_dir variable has been deprecated in favor of the upper case naming PROMETHEUS_MULTIPROC_DIR", DeprecationWarning) |
| 66 | with lock: |
| 67 | self.__check_for_pid_change() |
| 68 | self.__reset() |
| 69 | values.append(self) |
| 70 | |
| 71 | def __reset(self): |
| 72 | typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode = self._params |
| 73 | if typ == 'gauge': |
| 74 | file_prefix = typ + '_' + multiprocess_mode |
| 75 | else: |
| 76 | file_prefix = typ |
| 77 | if file_prefix not in files: |
| 78 | filename = os.path.join( |
| 79 | os.environ.get('PROMETHEUS_MULTIPROC_DIR'), |
| 80 | '{}_{}.db'.format(file_prefix, pid['value'])) |
| 81 | |
| 82 | files[file_prefix] = MmapedDict(filename) |
| 83 | self._file = files[file_prefix] |
| 84 | self._key = mmap_key(metric_name, name, labelnames, labelvalues, help_text) |
| 85 | self._value, self._timestamp = self._file.read_value(self._key) |
| 86 | |
| 87 | def __check_for_pid_change(self): |
| 88 | actual_pid = process_identifier() |
| 89 | if pid['value'] != actual_pid: |
| 90 | pid['value'] = actual_pid |
| 91 | # There has been a fork(), reset all the values. |
| 92 | for f in files.values(): |
| 93 | f.close() |
| 94 | files.clear() |
| 95 | for value in values: |
| 96 | value.__reset() |
no outgoing calls