| 9 | |
| 10 | |
| 11 | class SystemdAPIHandoverTest(BaseTest): |
| 12 | aptly_server = None |
| 13 | socket_path = "/tmp/_aptly_systemdapihandovertest.sock" |
| 14 | |
| 15 | def prepare(self): |
| 16 | # On Debian they use /lib on other systems /usr/lib. |
| 17 | systemd_activate = "/usr/lib/systemd/systemd-activate" |
| 18 | if not os.path.exists(systemd_activate): |
| 19 | systemd_activate = "/lib/systemd/systemd-activate" |
| 20 | if not os.path.exists(systemd_activate): |
| 21 | print("Could not find systemd-activate") |
| 22 | return |
| 23 | self.aptly_server = self._start_process("%s -l %s aptly api serve -no-lock" % |
| 24 | (systemd_activate, self.socket_path),) |
| 25 | super(SystemdAPIHandoverTest, self).prepare() |
| 26 | |
| 27 | def shutdown(self): |
| 28 | if self.aptly_server is not None: |
| 29 | self.aptly_server.terminate() |
| 30 | self.aptly_server.wait() |
| 31 | self.aptly_server = None |
| 32 | if os.path.exists(self.socket_path): |
| 33 | os.remove(self.socket_path) |
| 34 | super(SystemdAPIHandoverTest, self).shutdown() |
| 35 | |
| 36 | def run(self): |
| 37 | pass |
| 38 | |
| 39 | """ |
| 40 | Verify we can listen on a unix domain socket. |
| 41 | """ |
| 42 | def check(self): |
| 43 | if self.aptly_server is None: |
| 44 | print("Skipping test as we failed to setup a listener.") |
| 45 | return |
| 46 | session = requests_unixsocket.Session() |
| 47 | r = session.get('http+unix://%s/api/version' % urllib.parse.quote(self.socket_path, safe='')) |
| 48 | self.check_equal(r.json(), {'Version': os.environ['APTLY_VERSION']}) |
nothing calls this directly
no outgoing calls
no test coverage detected