(
self,
database: str | None = None,
user: str | None = None,
password: str | None = None,
host: str | None = None,
port: int | None = None,
socket: str | None = None,
character_set: str | None = None,
local_infile: bool | None = None,
ssl: dict[str, Any] | None = None,
init_command: str | None = None,
unbuffered: bool | None = None,
)
| 185 | self.connect() |
| 186 | |
| 187 | def connect( |
| 188 | self, |
| 189 | database: str | None = None, |
| 190 | user: str | None = None, |
| 191 | password: str | None = None, |
| 192 | host: str | None = None, |
| 193 | port: int | None = None, |
| 194 | socket: str | None = None, |
| 195 | character_set: str | None = None, |
| 196 | local_infile: bool | None = None, |
| 197 | ssl: dict[str, Any] | None = None, |
| 198 | init_command: str | None = None, |
| 199 | unbuffered: bool | None = None, |
| 200 | ): |
| 201 | db = database if database is not None else self.dbname |
| 202 | user = user if user is not None else self.user |
| 203 | password = password if password is not None else self.password |
| 204 | host = host if host is not None else self.host |
| 205 | port = port if port is not None else self.port |
| 206 | socket = socket if socket is not None else self.socket |
| 207 | character_set = character_set if character_set is not None else self.character_set |
| 208 | local_infile = local_infile if local_infile is not None else self.local_infile |
| 209 | ssl = ssl if ssl is not None else self.ssl |
| 210 | init_command = init_command if init_command is not None else self.init_command |
| 211 | unbuffered = unbuffered if unbuffered is not None else self.unbuffered |
| 212 | _logger.debug( |
| 213 | "Connection DB Params: \n" |
| 214 | "\tdatabase: %r" |
| 215 | "\tuser: %r" |
| 216 | "\thost: %r" |
| 217 | "\tport: %r" |
| 218 | "\tsocket: %r" |
| 219 | "\tcharacter_set: %r" |
| 220 | "\tlocal_infile: %r" |
| 221 | "\tssl: %r" |
| 222 | "\tinit_command: %r" |
| 223 | "\tunbuffered: %r", |
| 224 | db, |
| 225 | user, |
| 226 | host, |
| 227 | port, |
| 228 | socket, |
| 229 | character_set, |
| 230 | local_infile, |
| 231 | ssl, |
| 232 | init_command, |
| 233 | unbuffered, |
| 234 | ) |
| 235 | conv = conversions.copy() |
| 236 | conv.update({ |
| 237 | FIELD_TYPE.TIMESTAMP: lambda obj: convert_datetime(obj) or obj, |
| 238 | FIELD_TYPE.DATETIME: lambda obj: convert_datetime(obj) or obj, |
| 239 | FIELD_TYPE.TIME: lambda obj: convert_time(obj) or obj, |
| 240 | FIELD_TYPE.DATE: lambda obj: convert_date(obj) or obj, |
| 241 | }) |
| 242 | |
| 243 | # todo: still needed? was conditional on SSH tunnels |
| 244 | defer_connect = False |
no test coverage detected