MCPcopy
hub / github.com/HelloZeroNet/ZeroNet / query

Method query

src/Site/SiteStorage.py:191–210  ·  view source on GitHub ↗
(self, query, params=None)

Source from the content-addressed store, hash-verified

189
190 # Execute sql query or rebuild on dberror
191 def query(self, query, params=None):
192 if not query.strip().upper().startswith("SELECT"):
193 raise Exception("Only SELECT query supported")
194
195 if self.event_db_busy: # Db not ready for queries
196 self.log.debug("Wating for db...")
197 self.event_db_busy.get() # Wait for event
198 try:
199 res = self.getDb().execute(query, params)
200 except sqlite3.DatabaseError as err:
201 if err.__class__.__name__ == "DatabaseError":
202 self.log.error("Database error: %s, query: %s, try to rebuilding it..." % (err, query))
203 try:
204 self.rebuildDb()
205 except sqlite3.OperationalError:
206 pass
207 res = self.db.cur.execute(query, params)
208 else:
209 raise err
210 return res
211
212 # Open file object
213 def open(self, inner_path, mode="rb", create_dirs=False, **kwargs):

Callers 10

dbQueryMethod · 0.80
actionFileQueryMethod · 0.80
actionDbQueryMethod · 0.80
testCloneMethod · 0.80
testArchivedDownloadMethod · 0.80
actionFeedQueryMethod · 0.80
actionFeedSearchMethod · 0.80
testIncludeAddMethod · 0.80
testIncludeChangeMethod · 0.80

Calls 5

getDbMethod · 0.95
rebuildDbMethod · 0.95
errorMethod · 0.80
getMethod · 0.45
executeMethod · 0.45

Tested by

no test coverage detected