(cursor: Cursor, query, parameters)
| 97 | |
| 98 | |
| 99 | def execute_query(cursor: Cursor, query, parameters): |
| 100 | if parameters: |
| 101 | cursor.execute(query, parameters) |
| 102 | else: |
| 103 | cursor.execute(query) |
| 104 | try: |
| 105 | if cursor.description: |
| 106 | return cursor.fetchall() |
| 107 | else: |
| 108 | if cursor.rowcount == -1: |
| 109 | return None |
| 110 | return [(cursor.rowcount,)] |
| 111 | except psycopg.errors.ProgrammingError: |
| 112 | return cursor.statusmessage |
| 113 | |
| 114 | |
| 115 | def execute_steps(steps, variables, cursor: Cursor, connection: Connection): |
no test coverage detected
searching dependent graphs…