MCPcopy
hub / github.com/pimutils/vdirsyncer / HttpStorage

Class HttpStorage

vdirsyncer/storage/http.py:10–64  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

8
9
10class HttpStorage(Storage):
11 storage_name = 'http'
12 read_only = True
13 _repr_attributes = ('username', 'url')
14 _items = None
15
16 # Required for tests.
17 _ignore_uids = True
18
19 def __init__(self, url, username='', password='', verify=True, auth=None,
20 useragent=USERAGENT, verify_fingerprint=None, auth_cert=None,
21 **kwargs):
22 super().__init__(**kwargs)
23
24 self._settings = {
25 'auth': prepare_auth(auth, username, password),
26 'cert': prepare_client_cert(auth_cert),
27 'latin1_fallback': False,
28 }
29 self._settings.update(prepare_verify(verify, verify_fingerprint))
30
31 self.username, self.password = username, password
32 self.useragent = useragent
33
34 collection = kwargs.get('collection')
35 if collection is not None:
36 url = urlparse.urljoin(url, collection)
37 self.url = url
38 self.parsed_url = urlparse.urlparse(self.url)
39
40 def _default_headers(self):
41 return {'User-Agent': self.useragent}
42
43 def list(self):
44 r = request('GET', self.url, headers=self._default_headers(),
45 **self._settings)
46 self._items = {}
47
48 for item in split_collection(r.text):
49 item = Item(item)
50 if self._ignore_uids:
51 item = item.with_uid(item.hash)
52
53 self._items[item.ident] = item, item.hash
54
55 return ((href, etag) for href, (item, etag) in self._items.items())
56
57 def get(self, href):
58 if self._items is None:
59 self.list()
60
61 try:
62 return self._items[href]
63 except KeyError:
64 raise exceptions.NotFoundError(href)

Callers 3

test_listFunction · 0.90
test_readonly_paramFunction · 0.90

Calls

no outgoing calls

Tested by 3

test_listFunction · 0.72
test_readonly_paramFunction · 0.72