MCPcopy
hub / github.com/IBM/AssetOpsBench / CouchClient

Class CouchClient

src/servers/wo/couch.py:25–118  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

23
24
25class CouchClient:
26 def __init__(
27 self,
28 base_url: str,
29 db: str,
30 *,
31 username: Optional[str] = None,
32 password: Optional[str] = None,
33 timeout: float = 10.0,
34 ):
35 if httpx is None:
36 raise CouchError(
37 "httpx is required for the real CouchClient (pip install httpx)"
38 )
39 self.db = db
40 auth = (username, password) if username else None
41 self._c = httpx.AsyncClient(
42 base_url=base_url.rstrip("/"), auth=auth, timeout=timeout
43 )
44
45 async def aclose(self) -> None:
46 await self._c.aclose()
47
48 # ---- document CRUD ----
49 async def get(self, doc_id: str) -> Optional[Dict[str, Any]]:
50 r = await self._c.get(f"/{self.db}/{doc_id}")
51 if r.status_code == 404:
52 return None
53 r.raise_for_status()
54 return r.json()
55
56 async def put(self, doc: Dict[str, Any]) -> Dict[str, Any]:
57 if "_id" not in doc:
58 raise CouchError("document must have _id")
59 r = await self._c.put(f"/{self.db}/{doc['_id']}", json=doc)
60 if r.status_code == 409:
61 raise CouchError(f"conflict updating {doc['_id']} (stale _rev)")
62 r.raise_for_status()
63 return r.json()
64
65 async def delete(self, doc_id: str, rev: str) -> Dict[str, Any]:
66 r = await self._c.delete(f"/{self.db}/{doc_id}", params={"rev": rev})
67 r.raise_for_status()
68 return r.json()
69
70 # ---- queries ----
71 async def find(
72 self,
73 selector: Dict[str, Any],
74 *,
75 fields: Optional[List[str]] = None,
76 sort: Optional[List[Dict[str, str]]] = None,
77 limit: int = 200,
78 skip: int = 0,
79 ) -> List[Dict[str, Any]]:
80 body: Dict[str, Any] = {"selector": selector, "limit": limit, "skip": skip}
81 if fields:
82 body["fields"] = fields

Callers 1

dbFunction · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected