Clear existing flows and persist new ones. Returns the number of flows stored.
(store: GraphStore, flows: list[dict])
| 383 | |
| 384 | |
| 385 | def store_flows(store: GraphStore, flows: list[dict]) -> int: |
| 386 | """Clear existing flows and persist new ones. |
| 387 | |
| 388 | Returns the number of flows stored. |
| 389 | """ |
| 390 | # NOTE: store_flows uses _conn directly because it performs |
| 391 | # multi-statement batch writes (DELETE + INSERT loop) that are |
| 392 | # tightly coupled to the DB transaction lifecycle. |
| 393 | conn = store._conn |
| 394 | |
| 395 | if conn.in_transaction: |
| 396 | logger.warning("Rolling back uncommitted transaction before BEGIN IMMEDIATE") |
| 397 | conn.rollback() |
| 398 | # Wrap the full DELETE + INSERT sequence in an explicit transaction |
| 399 | # so partial writes cannot occur if an exception interrupts the loop. |
| 400 | conn.execute("BEGIN IMMEDIATE") |
| 401 | try: |
| 402 | conn.execute("DELETE FROM flow_memberships") |
| 403 | conn.execute("DELETE FROM flows") |
| 404 | |
| 405 | count = 0 |
| 406 | for flow in flows: |
| 407 | path_json = json.dumps(flow.get("path", [])) |
| 408 | conn.execute( |
| 409 | """INSERT INTO flows |
| 410 | (name, entry_point_id, depth, node_count, file_count, |
| 411 | criticality, path_json) |
| 412 | VALUES (?, ?, ?, ?, ?, ?, ?)""", |
| 413 | ( |
| 414 | flow["name"], |
| 415 | flow["entry_point_id"], |
| 416 | flow["depth"], |
| 417 | flow["node_count"], |
| 418 | flow["file_count"], |
| 419 | flow["criticality"], |
| 420 | path_json, |
| 421 | ), |
| 422 | ) |
| 423 | flow_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0] |
| 424 | |
| 425 | # Insert memberships. |
| 426 | node_ids = flow.get("path", []) |
| 427 | for position, node_id in enumerate(node_ids): |
| 428 | conn.execute( |
| 429 | "INSERT OR IGNORE INTO flow_memberships (flow_id, node_id, position) " |
| 430 | "VALUES (?, ?, ?)", |
| 431 | (flow_id, node_id, position), |
| 432 | ) |
| 433 | count += 1 |
| 434 | |
| 435 | conn.commit() |
| 436 | except BaseException: |
| 437 | conn.rollback() |
| 438 | raise |
| 439 | return count |
| 440 | |
| 441 | |
| 442 | def incremental_trace_flows( |