MCPcopy Index your code
hub / github.com/slackapi/python-slack-sdk / oauth_callback

Function oauth_callback

integration_tests/samples/oauth/oauth_v2.py:51–109  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

49
50@app.route("/slack/oauth_redirect", methods=["GET"])
51def oauth_callback():
52 # Retrieve the auth code and state from the request params
53 if "code" in request.args:
54 state = request.args["state"]
55 if state_store.consume(state):
56 code = request.args["code"]
57 client = WebClient() # no prepared token needed for this app
58 oauth_response = client.oauth_v2_access(client_id=client_id, client_secret=client_secret, code=code)
59 logger.info(f"oauth.v2.access response: {oauth_response}")
60
61 installed_enterprise = oauth_response.get("enterprise") or {}
62 is_enterprise_install = oauth_response.get("is_enterprise_install")
63 installed_team = oauth_response.get("team") or {}
64 installer = oauth_response.get("authed_user") or {}
65 incoming_webhook = oauth_response.get("incoming_webhook") or {}
66
67 bot_token = oauth_response.get("access_token")
68 # NOTE: oauth.v2.access doesn't include bot_id in response
69 bot_id = None
70 enterprise_url = None
71 if bot_token is not None:
72 auth_test = client.auth_test(token=bot_token)
73 bot_id = auth_test["bot_id"]
74 if is_enterprise_install is True:
75 enterprise_url = auth_test.get("url")
76
77 installation = Installation(
78 app_id=oauth_response.get("app_id"),
79 enterprise_id=installed_enterprise.get("id"),
80 enterprise_name=installed_enterprise.get("name"),
81 enterprise_url=enterprise_url,
82 team_id=installed_team.get("id"),
83 team_name=installed_team.get("name"),
84 bot_token=bot_token,
85 bot_id=bot_id,
86 bot_user_id=oauth_response.get("bot_user_id"),
87 bot_scopes=oauth_response.get("scope"), # comma-separated string
88 user_id=installer.get("id"),
89 user_token=installer.get("access_token"),
90 user_scopes=installer.get("scope"), # comma-separated string
91 incoming_webhook_url=incoming_webhook.get("url"),
92 incoming_webhook_channel=incoming_webhook.get("channel"),
93 incoming_webhook_channel_id=incoming_webhook.get("channel_id"),
94 incoming_webhook_configuration_url=incoming_webhook.get("configuration_url"),
95 is_enterprise_install=is_enterprise_install,
96 token_type=oauth_response.get("token_type"),
97 )
98 installation_store.save(installation)
99 return redirect_page_renderer.render_success_page(
100 app_id=installation.app_id,
101 team_id=installation.team_id,
102 is_enterprise_install=installation.is_enterprise_install,
103 enterprise_url=installation.enterprise_url,
104 )
105 else:
106 return redirect_page_renderer.render_failure_page("the state value is already expired")
107
108 error = request.args["error"] if "error" in request.args else ""

Callers

nothing calls this directly

Calls 9

oauth_v2_accessMethod · 0.95
auth_testMethod · 0.95
WebClientClass · 0.90
InstallationClass · 0.90
render_success_pageMethod · 0.80
render_failure_pageMethod · 0.80
consumeMethod · 0.45
getMethod · 0.45
saveMethod · 0.45

Tested by

no test coverage detected