MCPcopy
hub / github.com/slackapi/python-slack-sdk / _sync_send

Method _sync_send

slack_sdk/web/base_client.py:175–208  ·  view source on GitHub ↗
(self, api_url, req_args)

Source from the content-addressed store, hash-verified

173 # =================================================================
174
175 def _sync_send(self, api_url, req_args) -> SlackResponse:
176 params = req_args["params"] if "params" in req_args else None
177 data = req_args["data"] if "data" in req_args else None
178 files = req_args["files"] if "files" in req_args else None
179 _json = req_args["json"] if "json" in req_args else None
180 headers = req_args["headers"] if "headers" in req_args else None
181 token = params.get("token") if params and "token" in params else None
182 auth = req_args["auth"] if "auth" in req_args else None # Basic Auth for oauth.v2.access / oauth.access
183 if auth is not None:
184 headers = {}
185 if isinstance(auth, str):
186 headers["Authorization"] = auth
187 elif isinstance(auth, dict):
188 client_id, client_secret = auth["client_id"], auth["client_secret"]
189 value = b64encode(f"{client_id}:{client_secret}".encode("utf-8")).decode("ascii")
190 headers["Authorization"] = f"Basic {value}"
191 else:
192 self._logger.warning(f"As the auth: {auth}: {type(auth)} is unsupported, skipped")
193
194 body_params = {}
195 if params:
196 body_params.update(params)
197 if data:
198 body_params.update(data)
199
200 return self._urllib_api_call(
201 token=token,
202 url=api_url,
203 query_params={},
204 body_params=body_params,
205 files=files, # type: ignore[arg-type]
206 json_body=_json, # type: ignore[arg-type]
207 additional_headers=headers, # type: ignore[arg-type]
208 )
209
210 def _request_for_pagination(self, api_url: str, req_args: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
211 """This method is supposed to be used only for SlackResponse pagination

Callers 1

api_callMethod · 0.95

Calls 2

_urllib_api_callMethod · 0.95
getMethod · 0.45

Tested by

no test coverage detected