| 7 | |
| 8 | |
| 9 | class DBEngine(object): |
| 10 | def __init__(self, db_uri): |
| 11 | """ |
| 12 | db_uri = f'mysql+pymysql://{username}:{password}@{host}:{port}/{database}?charset=utf8mb4' |
| 13 | |
| 14 | """ |
| 15 | engine = create_engine(db_uri) |
| 16 | self.session = sessionmaker(bind=engine, autocommit=True)() |
| 17 | |
| 18 | @staticmethod |
| 19 | def value_decode(row: dict): |
| 20 | """ |
| 21 | Try to decode value of table |
| 22 | datetime.datetime-->string |
| 23 | datetime.date-->string |
| 24 | json str-->dict |
| 25 | :param row: |
| 26 | :return: |
| 27 | """ |
| 28 | for k, v in row.items(): |
| 29 | if isinstance(v, datetime.datetime): |
| 30 | row[k] = v.strftime("%Y-%m-%d %H:%M:%S") |
| 31 | elif isinstance(v, datetime.date): |
| 32 | row[k] = v.strftime("%Y-%m-%d") |
| 33 | elif isinstance(v, str): |
| 34 | try: |
| 35 | row[k] = json.loads(v) |
| 36 | except ValueError: |
| 37 | pass |
| 38 | |
| 39 | def _fetch(self, query, size=-1, commit=True): |
| 40 | query = query.strip() |
| 41 | result = self.session.execute(query) |
| 42 | if query.upper()[:6] == "SELECT": |
| 43 | if size < 0: |
| 44 | al = result.fetchall() |
| 45 | al = [dict(el) for el in al] |
| 46 | for el in al: |
| 47 | self.value_decode(el) |
| 48 | return al or None |
| 49 | elif size == 1: |
| 50 | on = dict(result.fetchone()) |
| 51 | self.value_decode(on) |
| 52 | return on or None |
| 53 | else: |
| 54 | mny = result.fetchmany(size) |
| 55 | mny = [dict(el) for el in mny] |
| 56 | for el in mny: |
| 57 | self.value_decode(el) |
| 58 | return mny or None |
| 59 | elif query.upper()[:6] in ("UPDATE", "DELETE", "INSERT"): |
| 60 | return {"rowcount": result.rowcount} |
| 61 | |
| 62 | def fetchone(self, query, commit=True): |
| 63 | return self._fetch(query, size=1, commit=commit) |
| 64 | |
| 65 | def fetchmany(self, query, size, commit=True): |
| 66 | return self._fetch(query=query, size=size, commit=commit) |
no outgoing calls