MCPcopy
hub / github.com/MagicStack/asyncpg / get_status

Method get_status

asyncpg/cluster.py:100–125  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

98 return self._data_dir
99
100 def get_status(self):
101 if self._pg_ctl is None:
102 self._init_env()
103
104 process = subprocess.run(
105 [self._pg_ctl, 'status', '-D', self._data_dir],
106 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
107 stdout, stderr = process.stdout, process.stderr
108
109 if (process.returncode == 4 or not os.path.exists(self._data_dir) or
110 not os.listdir(self._data_dir)):
111 return 'not-initialized'
112 elif process.returncode == 3:
113 return 'stopped'
114 elif process.returncode == 0:
115 r = re.match(r'.*PID\s?:\s+(\d+).*', stdout.decode())
116 if not r:
117 raise ClusterError(
118 'could not parse pg_ctl status output: {}'.format(
119 stdout.decode()))
120 self._daemon_pid = int(r.group(1))
121 return self._test_connection(timeout=0)
122 else:
123 raise ClusterError(
124 'pg_ctl status exited with status {:d}: {}'.format(
125 process.returncode, stderr))
126
127 async def connect(self, loop=None, **kwargs):
128 conn_info = self.get_connection_spec()

Callers 13

initMethod · 0.95
startMethod · 0.95
reloadMethod · 0.95
destroyMethod · 0.95
get_connection_specMethod · 0.95
reset_walMethod · 0.95
reset_hbaMethod · 0.95
add_hba_entryMethod · 0.95
initMethod · 0.45
_shutdown_clusterFunction · 0.45

Calls 4

_init_envMethod · 0.95
_test_connectionMethod · 0.95
ClusterErrorClass · 0.85
runMethod · 0.80

Tested by

no test coverage detected