(self, __context: Any)
| 832 | return mapping |
| 833 | |
| 834 | def model_post_init(self, __context: Any) -> None: # type: ignore[override] |
| 835 | super().model_post_init(__context) |
| 836 | if platform.system() != "Windows": |
| 837 | return |
| 838 | |
| 839 | scheme, sep, remainder = self.DATABASE_STRING.partition("://") |
| 840 | if not sep: |
| 841 | return |
| 842 | |
| 843 | dialect, driver_sep, driver = scheme.partition("+") |
| 844 | if not driver_sep or driver not in {"psycopg", "psycopg2"}: |
| 845 | return |
| 846 | |
| 847 | updated_string = f"{dialect}+asyncpg://{remainder}" |
| 848 | if updated_string == self.DATABASE_STRING: |
| 849 | return |
| 850 | |
| 851 | LOG.warning( |
| 852 | "Detected Windows environment: switching DATABASE_STRING driver from psycopg to asyncpg " |
| 853 | "for compatibility with the Proactor event loop policy." |
| 854 | ) |
| 855 | object.__setattr__(self, "DATABASE_STRING", updated_string) |
| 856 | |
| 857 | def is_sqlite(self) -> bool: |
| 858 | return self.DATABASE_STRING.startswith("sqlite") |
nothing calls this directly
no test coverage detected