MCPcopy Index your code
hub / github.com/MagicStack/asyncpg / _format_copy_opts

Method _format_copy_opts

asyncpg/connection.py:1142–1169  ·  view source on GitHub ↗
(self, *, format=None, oids=None, freeze=None,
                          delimiter=None, null=None, header=None, quote=None,
                          escape=None, force_quote=None, force_not_null=None,
                          force_null=None, encoding=None)

Source from the content-addressed store, hash-verified

1140 return where_clause
1141
1142 def _format_copy_opts(self, *, format=None, oids=None, freeze=None,
1143 delimiter=None, null=None, header=None, quote=None,
1144 escape=None, force_quote=None, force_not_null=None,
1145 force_null=None, encoding=None):
1146 kwargs = dict(locals())
1147 kwargs.pop('self')
1148 opts = []
1149
1150 if force_quote is not None and isinstance(force_quote, bool):
1151 kwargs.pop('force_quote')
1152 if force_quote:
1153 opts.append('FORCE_QUOTE *')
1154
1155 for k, v in kwargs.items():
1156 if v is not None:
1157 if k in ('force_not_null', 'force_null', 'force_quote'):
1158 v = '(' + ', '.join(utils._quote_ident(c) for c in v) + ')'
1159 elif k in ('oids', 'freeze', 'header'):
1160 v = str(v)
1161 else:
1162 v = utils._quote_literal(v)
1163
1164 opts.append('{} {}'.format(k.upper(), v))
1165
1166 if opts:
1167 return '(' + ', '.join(opts) + ')'
1168 else:
1169 return ''
1170
1171 async def _copy_out(self, copy_stmt, output, timeout):
1172 try:

Callers 3

copy_from_tableMethod · 0.95
copy_from_queryMethod · 0.95
copy_to_tableMethod · 0.95

Calls 1

upperMethod · 0.80

Tested by

no test coverage detected