BaseTest + testing aptly API
| 20 | |
| 21 | |
| 22 | class APITest(BaseTest): |
| 23 | """ |
| 24 | BaseTest + testing aptly API |
| 25 | """ |
| 26 | aptly_server = None |
| 27 | aptly_out = None |
| 28 | debugOutput = True |
| 29 | base_url = "127.0.0.1:8765" |
| 30 | configOverride = { |
| 31 | "FileSystemPublishEndpoints": { |
| 32 | "apiandserve": { |
| 33 | "rootDir": f"{os.environ['HOME']}/{BaseTest.aptlyDir}/apiandserve", |
| 34 | "linkMethod": "symlink" |
| 35 | } |
| 36 | }, |
| 37 | "enableMetricsEndpoint": True, |
| 38 | "enableSwaggerEndpoint": True |
| 39 | } |
| 40 | |
| 41 | def fixture_available(self): |
| 42 | return super(APITest, self).fixture_available() and requests is not None |
| 43 | |
| 44 | def prepare(self): |
| 45 | if APITest.aptly_server is None: |
| 46 | super(APITest, self).prepare() |
| 47 | |
| 48 | APITest.aptly_out = TestOut() |
| 49 | |
| 50 | configPath = os.path.join(os.environ["HOME"], self.aptlyConfigFile) |
| 51 | APITest.aptly_server = self._start_process(f"aptly api serve -no-lock -config={configPath} -listen={self.base_url}", stdout=APITest.aptly_out, stderr=APITest.aptly_out) |
| 52 | time.sleep(1) |
| 53 | else: |
| 54 | APITest.aptly_out.clear() |
| 55 | |
| 56 | if os.path.exists(os.path.join(os.environ["HOME"], self.aptlyDir, "upload")): |
| 57 | shutil.rmtree(os.path.join(os.environ["HOME"], self.aptlyDir, "upload")) |
| 58 | |
| 59 | self.prepare_fixture() |
| 60 | |
| 61 | def debug_output(self): |
| 62 | return APITest.aptly_out.get_contents() |
| 63 | |
| 64 | def run(self): |
| 65 | pass |
| 66 | |
| 67 | def get(self, uri, *args, **kwargs): |
| 68 | return requests.get("http://%s%s" % (self.base_url, uri), *args, **kwargs) |
| 69 | |
| 70 | def post(self, uri, *args, **kwargs): |
| 71 | if "json" in kwargs: |
| 72 | kwargs["data"] = json.dumps(kwargs.pop("json")) |
| 73 | if "headers" not in kwargs: |
| 74 | kwargs["headers"] = {} |
| 75 | kwargs["headers"]["Content-Type"] = "application/json" |
| 76 | return requests.post("http://%s%s" % (self.base_url, uri), *args, **kwargs) |
| 77 | |
| 78 | def _ensure_async(self, kwargs): |
| 79 | # Make sure we run this as an async task |
nothing calls this directly
no outgoing calls
no test coverage detected