MCPcopy Index your code
hub / github.com/pathwaycom/pathway / execute_sql

Method execute_sql

integration_tests/db_connectors/utils.py:1282–1341  ·  view source on GitHub ↗

Execute a single statement, retrying on deadlock victim (1205). Under heavy xdist parallelism every statement that touches CDC system metadata (sp_cdc_* procs, DDL on CDC-enabled objects, queries against cdc.* tables) competes with the capture agent for system-table locks

(
        self,
        query: str,
        params: tuple | None = None,
        drain_status_rows: bool = False,
        max_retries: int = 6,
        retry_delay: float = 0.5,
    )

Source from the content-addressed store, hash-verified

1280 return name
1281
1282 def execute_sql(
1283 self,
1284 query: str,
1285 params: tuple | None = None,
1286 drain_status_rows: bool = False,
1287 max_retries: int = 6,
1288 retry_delay: float = 0.5,
1289 ) -> None:
1290 """Execute a single statement, retrying on deadlock victim (1205).
1291
1292 Under heavy xdist parallelism every statement that touches CDC system
1293 metadata (sp_cdc_* procs, DDL on CDC-enabled objects, queries against
1294 cdc.* tables) competes with the capture agent for system-table locks
1295 and SQL Server occasionally elects it as the deadlock victim. Heavy
1296 DDL on master.sys.databases (CREATE/ALTER/DROP DATABASE) and on
1297 sys.objects/sys.schemas (CREATE/DROP TABLE/SCHEMA) is similarly
1298 vulnerable. The recommended response per Microsoft is to rerun the
1299 transaction, which is what we do here.
1300
1301 A fresh cursor is opened per attempt — pymssql leaves the cursor in a
1302 state where the next statement reports "Statement(s) could not be
1303 prepared" after a 1205. On success ``self.cursor`` is updated to the
1304 cursor that ran the statement, so callers can immediately
1305 ``self.cursor.fetchone()`` / ``self.cursor.fetchall()`` to read result
1306 rows from a SELECT.
1307
1308 ``drain_status_rows=True`` discards every rowset produced by the
1309 statement (use for sp_cdc_* procs that emit "(N rows affected)"
1310 rowsets pymssql leaves pending — the next statement on the same
1311 connection would otherwise fail with "Statement(s) could not be
1312 prepared"). Do not pass this for SELECT — it would discard the data.
1313
1314 Non-1205 exceptions propagate immediately — this is for transient
1315 contention, not for swallowing real errors.
1316 """
1317 last_exc: Exception | None = None
1318 for attempt in range(max_retries):
1319 cur = self.connection.cursor()
1320 try:
1321 if params is None:
1322 cur.execute(query)
1323 else:
1324 cur.execute(query, params)
1325 if drain_status_rows:
1326 while cur.nextset():
1327 pass
1328 self.cursor = cur
1329 return
1330 except pymssql.exceptions.OperationalError as e:
1331 if "1205" in str(e) and attempt < max_retries - 1:
1332 last_exc = e
1333 logging.warning(
1334 f"deadlock victim on attempt {attempt + 1}/"
1335 f"{max_retries}, retrying: {e}"
1336 )
1337 time.sleep(retry_delay * (1.25**attempt))
1338 continue
1339 raise

Calls

no outgoing calls