A context.
| 14 | |
| 15 | |
| 16 | class Context: |
| 17 | """A context.""" |
| 18 | |
| 19 | def __init__(self, name, orchestrator=None, host=None, endpoints=None, |
| 20 | tls=False): |
| 21 | if not name: |
| 22 | raise Exception("Name not provided") |
| 23 | self.name = name |
| 24 | self.context_type = None |
| 25 | self.orchestrator = orchestrator |
| 26 | self.endpoints = {} |
| 27 | self.tls_cfg = {} |
| 28 | self.meta_path = "IN MEMORY" |
| 29 | self.tls_path = "IN MEMORY" |
| 30 | |
| 31 | if not endpoints: |
| 32 | # set default docker endpoint if no endpoint is set |
| 33 | default_endpoint = "docker" if ( |
| 34 | not orchestrator or orchestrator == "swarm" |
| 35 | ) else orchestrator |
| 36 | |
| 37 | self.endpoints = { |
| 38 | default_endpoint: { |
| 39 | "Host": get_context_host(host, tls), |
| 40 | "SkipTLSVerify": not tls |
| 41 | } |
| 42 | } |
| 43 | return |
| 44 | |
| 45 | # check docker endpoints |
| 46 | for k, v in endpoints.items(): |
| 47 | if not isinstance(v, dict): |
| 48 | # unknown format |
| 49 | raise ContextException( |
| 50 | f"Unknown endpoint format for context {name}: {v}", |
| 51 | ) |
| 52 | |
| 53 | self.endpoints[k] = v |
| 54 | if k != "docker": |
| 55 | continue |
| 56 | |
| 57 | self.endpoints[k]["Host"] = v.get("Host", get_context_host( |
| 58 | host, tls)) |
| 59 | self.endpoints[k]["SkipTLSVerify"] = bool(v.get( |
| 60 | "SkipTLSVerify", not tls)) |
| 61 | |
| 62 | def set_endpoint( |
| 63 | self, name="docker", host=None, tls_cfg=None, |
| 64 | skip_tls_verify=False, def_namespace=None): |
| 65 | self.endpoints[name] = { |
| 66 | "Host": get_context_host(host, not skip_tls_verify), |
| 67 | "SkipTLSVerify": skip_tls_verify |
| 68 | } |
| 69 | if def_namespace: |
| 70 | self.endpoints[name]["DefaultNamespace"] = def_namespace |
| 71 | |
| 72 | if tls_cfg: |
| 73 | self.tls_cfg[name] = tls_cfg |
no outgoing calls