| 11 | |
| 12 | |
| 13 | class Binlog2sql(object): |
| 14 | |
| 15 | def __init__(self, connection_settings, start_file=None, start_pos=None, end_file=None, end_pos=None, |
| 16 | start_time=None, stop_time=None, only_schemas=None, only_tables=None, no_pk=False, |
| 17 | flashback=False, stop_never=False, back_interval=1.0, only_dml=True, sql_type=None): |
| 18 | """ |
| 19 | conn_setting: {'host': 127.0.0.1, 'port': 3306, 'user': user, 'passwd': passwd, 'charset': 'utf8'} |
| 20 | """ |
| 21 | |
| 22 | if not start_file: |
| 23 | raise ValueError('Lack of parameter: start_file') |
| 24 | |
| 25 | self.conn_setting = connection_settings |
| 26 | self.start_file = start_file |
| 27 | self.start_pos = start_pos if start_pos else 4 # use binlog v4 |
| 28 | self.end_file = end_file if end_file else start_file |
| 29 | self.end_pos = end_pos |
| 30 | if start_time: |
| 31 | self.start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S") |
| 32 | else: |
| 33 | self.start_time = datetime.datetime.strptime('1980-01-01 00:00:00', "%Y-%m-%d %H:%M:%S") |
| 34 | if stop_time: |
| 35 | self.stop_time = datetime.datetime.strptime(stop_time, "%Y-%m-%d %H:%M:%S") |
| 36 | else: |
| 37 | self.stop_time = datetime.datetime.strptime('2999-12-31 00:00:00', "%Y-%m-%d %H:%M:%S") |
| 38 | |
| 39 | self.only_schemas = only_schemas if only_schemas else None |
| 40 | self.only_tables = only_tables if only_tables else None |
| 41 | self.no_pk, self.flashback, self.stop_never, self.back_interval = (no_pk, flashback, stop_never, back_interval) |
| 42 | self.only_dml = only_dml |
| 43 | self.sql_type = [t.upper() for t in sql_type] if sql_type else [] |
| 44 | |
| 45 | self.binlogList = [] |
| 46 | self.connection = pymysql.connect(**self.conn_setting) |
| 47 | with self.connection as cursor: |
| 48 | cursor.execute("SHOW MASTER STATUS") |
| 49 | self.eof_file, self.eof_pos = cursor.fetchone()[:2] |
| 50 | cursor.execute("SHOW MASTER LOGS") |
| 51 | bin_index = [row[0] for row in cursor.fetchall()] |
| 52 | if self.start_file not in bin_index: |
| 53 | raise ValueError('parameter error: start_file %s not in mysql server' % self.start_file) |
| 54 | binlog2i = lambda x: x.split('.')[1] |
| 55 | for binary in bin_index: |
| 56 | if binlog2i(self.start_file) <= binlog2i(binary) <= binlog2i(self.end_file): |
| 57 | self.binlogList.append(binary) |
| 58 | |
| 59 | cursor.execute("SELECT @@server_id") |
| 60 | self.server_id = cursor.fetchone()[0] |
| 61 | if not self.server_id: |
| 62 | raise ValueError('missing server_id in %s:%s' % (self.conn_setting['host'], self.conn_setting['port'])) |
| 63 | |
| 64 | def process_binlog(self): |
| 65 | stream = BinLogStreamReader(connection_settings=self.conn_setting, server_id=self.server_id, |
| 66 | log_file=self.start_file, log_pos=self.start_pos, only_schemas=self.only_schemas, |
| 67 | only_tables=self.only_tables, resume_stream=True, blocking=True) |
| 68 | |
| 69 | flag_last_event = False |
| 70 | e_start_pos, last_pos = stream.log_pos, stream.log_pos |