Simplify a query by converting CTEs into table metadata objects
(full_text, text_before_cursor)
| 17 | |
| 18 | |
| 19 | def isolate_query_ctes(full_text, text_before_cursor): |
| 20 | """Simplify a query by converting CTEs into table metadata objects""" |
| 21 | |
| 22 | if not full_text or not full_text.strip(): |
| 23 | return full_text, text_before_cursor, () |
| 24 | |
| 25 | ctes, remainder = extract_ctes(full_text) |
| 26 | if not ctes: |
| 27 | return full_text, text_before_cursor, () |
| 28 | |
| 29 | current_position = len(text_before_cursor) |
| 30 | meta = [] |
| 31 | |
| 32 | for cte in ctes: |
| 33 | if cte.start < current_position < cte.stop: |
| 34 | # Currently editing a cte - treat its body as the current full_text |
| 35 | text_before_cursor = full_text[cte.start : current_position] |
| 36 | full_text = full_text[cte.start : cte.stop] |
| 37 | return full_text, text_before_cursor, meta |
| 38 | |
| 39 | # Append this cte to the list of available table metadata |
| 40 | cols = (ColumnMetadata(name, None, ()) for name in cte.columns) |
| 41 | meta.append(TableMetadata(cte.name, cols)) |
| 42 | |
| 43 | # Editing past the last cte (ie the main body of the query) |
| 44 | full_text = full_text[ctes[-1].stop :] |
| 45 | text_before_cursor = text_before_cursor[ctes[-1].stop : current_position] |
| 46 | |
| 47 | return full_text, text_before_cursor, tuple(meta) |
| 48 | |
| 49 | |
| 50 | def extract_ctes(sql): |
no test coverage detected