Generator that yields (table_name, columns, [rows]) from MySQL dump.
(filepath)
| 173 | |
| 174 | |
| 175 | def stream_inserts(filepath): |
| 176 | """Generator that yields (table_name, columns, [rows]) from MySQL dump.""" |
| 177 | in_insert = False |
| 178 | current_table = None |
| 179 | current_columns = None |
| 180 | current_rows = [] |
| 181 | |
| 182 | insert_pattern = re.compile(r"INSERT INTO `([^`]+)` \((.*?)\) VALUES", re.IGNORECASE) |
| 183 | |
| 184 | with open(filepath, 'r', encoding='utf-8-sig') as f: |
| 185 | for line in f: |
| 186 | line = line.rstrip('\n') |
| 187 | |
| 188 | if in_insert: |
| 189 | # Check if this is the end of the INSERT |
| 190 | stripped = line.strip() |
| 191 | if stripped.endswith(');'): |
| 192 | # Remove trailing ); |
| 193 | value = stripped[:-2].strip() |
| 194 | if value.startswith('('): |
| 195 | row = parse_insert_row(value, current_columns) |
| 196 | current_rows.append(row) |
| 197 | yield (current_table, current_columns, current_rows) |
| 198 | current_rows = [] |
| 199 | in_insert = False |
| 200 | current_table = None |
| 201 | current_columns = None |
| 202 | elif stripped.endswith('),'): |
| 203 | value = stripped[:-2].strip() |
| 204 | if value.startswith('('): |
| 205 | row = parse_insert_row(value, current_columns) |
| 206 | current_rows.append(row) |
| 207 | elif stripped.startswith('(') and stripped.endswith(')'): |
| 208 | row = parse_insert_row(stripped, current_columns) |
| 209 | current_rows.append(row) |
| 210 | else: |
| 211 | # Could be continuation of a single row |
| 212 | pass |
| 213 | |
| 214 | # Check for new INSERT statement |
| 215 | m = insert_pattern.search(line) |
| 216 | if m and not in_insert: |
| 217 | current_table = m.group(1) |
| 218 | current_columns_raw = m.group(2) |
| 219 | current_columns = [c.strip('` ') for c in current_columns_raw.split(',')] |
| 220 | current_rows = [] |
| 221 | in_insert = True |
| 222 | |
| 223 | # Check if values start on the same line |
| 224 | after_values = line[m.end():].strip() |
| 225 | if after_values.startswith('('): |
| 226 | if after_values.endswith(');'): |
| 227 | row = parse_insert_row(after_values, current_columns) |
| 228 | current_rows.append(row) |
| 229 | yield (current_table, current_columns, current_rows) |
| 230 | current_rows = [] |
| 231 | in_insert = False |
| 232 | elif after_values.endswith('),'): |
no test coverage detected