Identifies whether a SQL statement is a DDL, DML, or DQL statement.
(
sql_statement: str, dialect: SQLGLOT_DIALECTS | None = None
)
| 20 | |
| 21 | |
| 22 | def classify_sql_statement( |
| 23 | sql_statement: str, dialect: SQLGLOT_DIALECTS | None = None |
| 24 | ) -> SQL_TYPE | Literal["unknown"]: |
| 25 | """ |
| 26 | Identifies whether a SQL statement is a DDL, DML, or DQL statement. |
| 27 | """ |
| 28 | DependencyManager.sqlglot.require(why="SQL parsing") |
| 29 | |
| 30 | from sqlglot import exp, parse |
| 31 | from sqlglot.errors import ParseError |
| 32 | |
| 33 | sql_statement = sql_statement.strip().lower() |
| 34 | try: |
| 35 | with _loggers.suppress_warnings_logs("sqlglot"): |
| 36 | expression_list = parse(sql_statement, dialect=dialect) |
| 37 | except ParseError as e: |
| 38 | log_sql_error( |
| 39 | LOGGER.debug, |
| 40 | message="Failed to parse SQL statement for classification.", |
| 41 | exception=e, |
| 42 | rule_code="MF005", |
| 43 | node=None, |
| 44 | sql_content=sql_statement, |
| 45 | ) |
| 46 | return "unknown" |
| 47 | |
| 48 | for expression in expression_list: |
| 49 | if expression is None: |
| 50 | continue |
| 51 | |
| 52 | if bool( |
| 53 | expression.find( |
| 54 | exp.Create, exp.Drop, exp.Alter, exp.Attach, exp.Detach |
| 55 | ) |
| 56 | ): |
| 57 | return "DDL" |
| 58 | elif bool(expression.find(exp.Insert, exp.Update, exp.Delete)): |
| 59 | return "DML" |
| 60 | else: |
| 61 | return "DQL" |
| 62 | |
| 63 | return "unknown" |
searching dependent graphs…