Extract Redis connection configuration to compare connections. Args: connection: Redis connection instance Returns: Dictionary of connection parameters for comparison
(self, connection: Redis)
| 48 | self._connection_config = None |
| 49 | |
| 50 | def _get_connection_config(self, connection: Redis) -> dict[str, Any]: |
| 51 | """ |
| 52 | Extract Redis connection configuration to compare connections. |
| 53 | |
| 54 | Args: |
| 55 | connection: Redis connection instance |
| 56 | |
| 57 | Returns: |
| 58 | Dictionary of connection parameters for comparison |
| 59 | """ |
| 60 | kwargs = connection.connection_pool.connection_kwargs |
| 61 | |
| 62 | # Only compare essential connection parameters that determine if |
| 63 | # two connections are to the same Redis instance |
| 64 | essential_params = ['host', 'port', 'db', 'username', 'password'] |
| 65 | return {key: kwargs.get(key) for key in essential_params if key in kwargs} |
| 66 | |
| 67 | def register( |
| 68 | self, |
no outgoing calls
no test coverage detected