Extract constant table expresseions from a query Returns tuple (ctes, remainder_sql) ctes is a list of TableExpression namedtuples remainder_sql is the text from the original query after the CTEs have been stripped.
(sql)
| 48 | |
| 49 | |
| 50 | def extract_ctes(sql): |
| 51 | """Extract constant table expresseions from a query |
| 52 | |
| 53 | Returns tuple (ctes, remainder_sql) |
| 54 | |
| 55 | ctes is a list of TableExpression namedtuples |
| 56 | remainder_sql is the text from the original query after the CTEs have |
| 57 | been stripped. |
| 58 | """ |
| 59 | |
| 60 | p = parse(sql)[0] |
| 61 | |
| 62 | # Make sure the first meaningful token is "WITH" which is necessary to |
| 63 | # define CTEs |
| 64 | idx, tok = p.token_next(-1, skip_ws=True, skip_cm=True) |
| 65 | if not (tok and tok.ttype == CTE): |
| 66 | return [], sql |
| 67 | |
| 68 | # Get the next (meaningful) token, which should be the first CTE |
| 69 | idx, tok = p.token_next(idx) |
| 70 | if not tok: |
| 71 | return ([], "") |
| 72 | start_pos = token_start_pos(p.tokens, idx) |
| 73 | ctes = [] |
| 74 | |
| 75 | if isinstance(tok, IdentifierList): |
| 76 | # Multiple ctes |
| 77 | for t in tok.get_identifiers(): |
| 78 | cte_start_offset = token_start_pos(tok.tokens, tok.token_index(t)) |
| 79 | cte = get_cte_from_token(t, start_pos + cte_start_offset) |
| 80 | if not cte: |
| 81 | continue |
| 82 | ctes.append(cte) |
| 83 | elif isinstance(tok, Identifier): |
| 84 | # A single CTE |
| 85 | cte = get_cte_from_token(tok, start_pos) |
| 86 | if cte: |
| 87 | ctes.append(cte) |
| 88 | |
| 89 | idx = p.token_index(tok) + 1 |
| 90 | |
| 91 | # Collapse everything after the ctes into a remainder query |
| 92 | remainder = "".join(str(tok) for tok in p.tokens[idx:]) |
| 93 | |
| 94 | return ctes, remainder |
| 95 | |
| 96 | |
| 97 | def get_cte_from_token(tok, pos0): |