(self)
| 62 | raise ValueError('missing server_id in %s:%s' % (self.conn_setting['host'], self.conn_setting['port'])) |
| 63 | |
| 64 | def process_binlog(self): |
| 65 | stream = BinLogStreamReader(connection_settings=self.conn_setting, server_id=self.server_id, |
| 66 | log_file=self.start_file, log_pos=self.start_pos, only_schemas=self.only_schemas, |
| 67 | only_tables=self.only_tables, resume_stream=True, blocking=True) |
| 68 | |
| 69 | flag_last_event = False |
| 70 | e_start_pos, last_pos = stream.log_pos, stream.log_pos |
| 71 | # to simplify code, we do not use flock for tmp_file. |
| 72 | tmp_file = create_unique_file('%s.%s' % (self.conn_setting['host'], self.conn_setting['port'])) |
| 73 | with temp_open(tmp_file, "w") as f_tmp, self.connection as cursor: |
| 74 | for binlog_event in stream: |
| 75 | if not self.stop_never: |
| 76 | try: |
| 77 | event_time = datetime.datetime.fromtimestamp(binlog_event.timestamp) |
| 78 | except OSError: |
| 79 | event_time = datetime.datetime(1980, 1, 1, 0, 0) |
| 80 | if (stream.log_file == self.end_file and stream.log_pos == self.end_pos) or \ |
| 81 | (stream.log_file == self.eof_file and stream.log_pos == self.eof_pos): |
| 82 | flag_last_event = True |
| 83 | elif event_time < self.start_time: |
| 84 | if not (isinstance(binlog_event, RotateEvent) |
| 85 | or isinstance(binlog_event, FormatDescriptionEvent)): |
| 86 | last_pos = binlog_event.packet.log_pos |
| 87 | continue |
| 88 | elif (stream.log_file not in self.binlogList) or \ |
| 89 | (self.end_pos and stream.log_file == self.end_file and stream.log_pos > self.end_pos) or \ |
| 90 | (stream.log_file == self.eof_file and stream.log_pos > self.eof_pos) or \ |
| 91 | (event_time >= self.stop_time): |
| 92 | break |
| 93 | # else: |
| 94 | # raise ValueError('unknown binlog file or position') |
| 95 | |
| 96 | if isinstance(binlog_event, QueryEvent) and binlog_event.query == 'BEGIN': |
| 97 | e_start_pos = last_pos |
| 98 | |
| 99 | if isinstance(binlog_event, QueryEvent) and not self.only_dml: |
| 100 | sql = concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlog_event, |
| 101 | flashback=self.flashback, no_pk=self.no_pk) |
| 102 | if sql: |
| 103 | print(sql) |
| 104 | elif is_dml_event(binlog_event) and event_type(binlog_event) in self.sql_type: |
| 105 | for row in binlog_event.rows: |
| 106 | sql = concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlog_event, no_pk=self.no_pk, |
| 107 | row=row, flashback=self.flashback, e_start_pos=e_start_pos) |
| 108 | if self.flashback: |
| 109 | f_tmp.write(sql + '\n') |
| 110 | else: |
| 111 | print(sql) |
| 112 | |
| 113 | if not (isinstance(binlog_event, RotateEvent) or isinstance(binlog_event, FormatDescriptionEvent)): |
| 114 | last_pos = binlog_event.packet.log_pos |
| 115 | if flag_last_event: |
| 116 | break |
| 117 | |
| 118 | stream.close() |
| 119 | f_tmp.close() |
| 120 | if self.flashback: |
| 121 | self.print_rollback_sql(filename=tmp_file) |
no test coverage detected