MCPcopy Index your code
hub / github.com/danfengcao/binlog2sql / process_binlog

Method process_binlog

binlog2sql/binlog2sql.py:64–122  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

62 raise ValueError('missing server_id in %s:%s' % (self.conn_setting['host'], self.conn_setting['port']))
63
64 def process_binlog(self):
65 stream = BinLogStreamReader(connection_settings=self.conn_setting, server_id=self.server_id,
66 log_file=self.start_file, log_pos=self.start_pos, only_schemas=self.only_schemas,
67 only_tables=self.only_tables, resume_stream=True, blocking=True)
68
69 flag_last_event = False
70 e_start_pos, last_pos = stream.log_pos, stream.log_pos
71 # to simplify code, we do not use flock for tmp_file.
72 tmp_file = create_unique_file('%s.%s' % (self.conn_setting['host'], self.conn_setting['port']))
73 with temp_open(tmp_file, "w") as f_tmp, self.connection as cursor:
74 for binlog_event in stream:
75 if not self.stop_never:
76 try:
77 event_time = datetime.datetime.fromtimestamp(binlog_event.timestamp)
78 except OSError:
79 event_time = datetime.datetime(1980, 1, 1, 0, 0)
80 if (stream.log_file == self.end_file and stream.log_pos == self.end_pos) or \
81 (stream.log_file == self.eof_file and stream.log_pos == self.eof_pos):
82 flag_last_event = True
83 elif event_time < self.start_time:
84 if not (isinstance(binlog_event, RotateEvent)
85 or isinstance(binlog_event, FormatDescriptionEvent)):
86 last_pos = binlog_event.packet.log_pos
87 continue
88 elif (stream.log_file not in self.binlogList) or \
89 (self.end_pos and stream.log_file == self.end_file and stream.log_pos > self.end_pos) or \
90 (stream.log_file == self.eof_file and stream.log_pos > self.eof_pos) or \
91 (event_time >= self.stop_time):
92 break
93 # else:
94 # raise ValueError('unknown binlog file or position')
95
96 if isinstance(binlog_event, QueryEvent) and binlog_event.query == 'BEGIN':
97 e_start_pos = last_pos
98
99 if isinstance(binlog_event, QueryEvent) and not self.only_dml:
100 sql = concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlog_event,
101 flashback=self.flashback, no_pk=self.no_pk)
102 if sql:
103 print(sql)
104 elif is_dml_event(binlog_event) and event_type(binlog_event) in self.sql_type:
105 for row in binlog_event.rows:
106 sql = concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlog_event, no_pk=self.no_pk,
107 row=row, flashback=self.flashback, e_start_pos=e_start_pos)
108 if self.flashback:
109 f_tmp.write(sql + '\n')
110 else:
111 print(sql)
112
113 if not (isinstance(binlog_event, RotateEvent) or isinstance(binlog_event, FormatDescriptionEvent)):
114 last_pos = binlog_event.packet.log_pos
115 if flag_last_event:
116 break
117
118 stream.close()
119 f_tmp.close()
120 if self.flashback:
121 self.print_rollback_sql(filename=tmp_file)

Callers 1

binlog2sql.pyFile · 0.80

Calls 6

print_rollback_sqlMethod · 0.95
create_unique_fileFunction · 0.90
temp_openFunction · 0.90
is_dml_eventFunction · 0.90
event_typeFunction · 0.90

Tested by

no test coverage detected