| 49 | aliases = Dict(dict(keep="HistoryTrim.keep")) # type: ignore |
| 50 | |
| 51 | def start(self): |
| 52 | profile_dir = Path(self.profile_dir.location) |
| 53 | hist_file = profile_dir / "history.sqlite" |
| 54 | # Connections must be closed, not merely committed, before the file |
| 55 | # shuffling below: Windows refuses to unlink or rename a database |
| 56 | # that still has open handles. |
| 57 | with closing(sqlite3.connect(hist_file)) as con: |
| 58 | # Grab the recent history from the current database. |
| 59 | inputs = list(con.execute('SELECT session, line, source, source_raw FROM ' |
| 60 | 'history ORDER BY session DESC, line DESC LIMIT ?', (self.keep+1,))) |
| 61 | if len(inputs) <= self.keep: |
| 62 | print("There are already at most %d entries in the history database." % self.keep) |
| 63 | print("Not doing anything. Use --keep= argument to keep fewer entries") |
| 64 | return |
| 65 | |
| 66 | print("Trimming history to the most recent %d entries." % self.keep) |
| 67 | |
| 68 | inputs.pop() # Remove the extra element we got to check the length. |
| 69 | inputs.reverse() |
| 70 | if inputs: |
| 71 | first_session = inputs[0][0] |
| 72 | outputs = list(con.execute('SELECT session, line, output FROM ' |
| 73 | 'output_history WHERE session >= ?', (first_session,))) |
| 74 | sessions = list(con.execute('SELECT session, start, end, num_cmds, remark FROM ' |
| 75 | 'sessions WHERE session >= ?', (first_session,))) |
| 76 | |
| 77 | # Create the new history database. |
| 78 | new_hist_file = profile_dir / "history.sqlite.new" |
| 79 | i = 0 |
| 80 | while new_hist_file.exists(): |
| 81 | # Make sure we don't interfere with an existing file. |
| 82 | i += 1 |
| 83 | new_hist_file = profile_dir / ("history.sqlite.new" + str(i)) |
| 84 | with closing(sqlite3.connect(new_hist_file)) as new_db: |
| 85 | new_db.execute("""CREATE TABLE IF NOT EXISTS sessions (session integer |
| 86 | primary key autoincrement, start timestamp, |
| 87 | end timestamp, num_cmds integer, remark text)""") |
| 88 | new_db.execute("""CREATE TABLE IF NOT EXISTS history |
| 89 | (session integer, line integer, source text, source_raw text, |
| 90 | PRIMARY KEY (session, line))""") |
| 91 | new_db.execute("""CREATE TABLE IF NOT EXISTS output_history |
| 92 | (session integer, line integer, output text, |
| 93 | PRIMARY KEY (session, line))""") |
| 94 | new_db.commit() |
| 95 | |
| 96 | |
| 97 | if inputs: |
| 98 | with new_db: |
| 99 | # Add the recent history into the new database. |
| 100 | new_db.executemany('insert into sessions values (?,?,?,?,?)', sessions) |
| 101 | new_db.executemany('insert into history values (?,?,?,?)', inputs) |
| 102 | new_db.executemany('insert into output_history values (?,?,?)', outputs) |
| 103 | |
| 104 | if self.backup: |
| 105 | i = 1 |
| 106 | backup_hist_file = profile_dir / ("history.sqlite.old.%d" % i) |
| 107 | while backup_hist_file.exists(): |
| 108 | i += 1 |