|
| 1 | +"""SSO login base dependency |
| 2 | +""" |
| 3 | +# pylint: disable=too-few-public-methods |
| 4 | + |
| 5 | +import json |
| 6 | +import sys |
| 7 | +import warnings |
| 8 | +from typing import Any, Dict, List, Optional |
| 9 | + |
| 10 | +import httpx |
| 11 | +import pydantic |
| 12 | +from oauthlib.oauth2 import WebApplicationClient |
| 13 | +from starlette.exceptions import HTTPException |
| 14 | +from starlette.requests import Request |
| 15 | +from starlette.responses import RedirectResponse |
| 16 | + |
| 17 | +if sys.version_info >= (3, 8): |
| 18 | + from typing import TypedDict |
| 19 | +else: |
| 20 | + from typing_extensions import TypedDict |
| 21 | + |
| 22 | +DiscoveryDocument = TypedDict( |
| 23 | + "DiscoveryDocument", {"authorization_endpoint": str, "token_endpoint": str, "userinfo_endpoint": str} |
| 24 | +) |
| 25 | + |
| 26 | + |
| 27 | +class UnsetStateWarning(UserWarning): |
| 28 | + """Warning about unset state parameter""" |
| 29 | + |
| 30 | + |
| 31 | +class SSOLoginError(HTTPException): |
| 32 | + """Raised when any login-related error ocurrs |
| 33 | + (such as when user is not verified or if there was an attempt for fake login) |
| 34 | + """ |
| 35 | + |
| 36 | + |
| 37 | +class OpenID(pydantic.BaseModel): # pylint: disable=no-member |
| 38 | + """Class (schema) to represent information got from sso provider in a common form.""" |
| 39 | + |
| 40 | + id: Optional[str] = None |
| 41 | + email: Optional[str] = None |
| 42 | + first_name: Optional[str] = None |
| 43 | + last_name: Optional[str] = None |
| 44 | + display_name: Optional[str] = None |
| 45 | + picture: Optional[str] = None |
| 46 | + provider: Optional[str] = None |
| 47 | + |
| 48 | + |
| 49 | +# pylint: disable=too-many-instance-attributes |
| 50 | +class SSOBase: |
| 51 | + """Base class (mixin) for all SSO providers""" |
| 52 | + |
| 53 | + provider: str = NotImplemented |
| 54 | + client_id: str = NotImplemented |
| 55 | + client_secret: str = NotImplemented |
| 56 | + redirect_uri: Optional[str] = NotImplemented |
| 57 | + scope: List[str] = NotImplemented |
| 58 | + _oauth_client: Optional[WebApplicationClient] = None |
| 59 | + additional_headers: Optional[Dict[str, Any]] = None |
| 60 | + |
| 61 | + def __init__( |
| 62 | + self, |
| 63 | + client_id: str, |
| 64 | + client_secret: str, |
| 65 | + redirect_uri: Optional[str] = None, |
| 66 | + allow_insecure_http: bool = False, |
| 67 | + use_state: bool = False, |
| 68 | + scope: Optional[List[str]] = None, |
| 69 | + ): |
| 70 | + # pylint: disable=too-many-arguments |
| 71 | + self.client_id = client_id |
| 72 | + self.client_secret = client_secret |
| 73 | + self.redirect_uri = redirect_uri |
| 74 | + self.allow_insecure_http = allow_insecure_http |
| 75 | + # TODO: Remove use_state argument and attribute |
| 76 | + if use_state: |
| 77 | + warnings.warn( |
| 78 | + ( |
| 79 | + "Argument 'use_state' of SSOBase's constructor is deprecated and will be removed in " |
| 80 | + "future releases. Use 'state' argument of individual methods instead." |
| 81 | + ), |
| 82 | + DeprecationWarning, |
| 83 | + ) |
| 84 | + self.scope = scope or self.scope |
| 85 | + self._refresh_token: Optional[str] = None |
| 86 | + self._state: Optional[str] = None |
| 87 | + |
| 88 | + @property |
| 89 | + def state(self) -> Optional[str]: |
| 90 | + """Gets state as it was returned from the server""" |
| 91 | + if self._state is None: |
| 92 | + warnings.warn( |
| 93 | + "'state' parameter is unset. This means the server either " |
| 94 | + "didn't return state (was this expected?) or 'verify_and_process' hasn't been called yet.", |
| 95 | + UnsetStateWarning, |
| 96 | + ) |
| 97 | + return self._state |
| 98 | + |
| 99 | + @property |
| 100 | + def oauth_client(self) -> WebApplicationClient: |
| 101 | + """OAuth Client to help us generate requests and parse responses""" |
| 102 | + if self.client_id == NotImplemented: |
| 103 | + raise NotImplementedError(f"Provider {self.provider} not supported") |
| 104 | + if self._oauth_client is None: |
| 105 | + self._oauth_client = WebApplicationClient(self.client_id) |
| 106 | + return self._oauth_client |
| 107 | + |
| 108 | + @property |
| 109 | + def access_token(self) -> Optional[str]: |
| 110 | + """Access token from token endpoint""" |
| 111 | + return self.oauth_client.access_token |
| 112 | + |
| 113 | + @property |
| 114 | + def refresh_token(self) -> Optional[str]: |
| 115 | + """Get refresh token (if returned from provider)""" |
| 116 | + return self._refresh_token or self.oauth_client.refresh_token |
| 117 | + |
| 118 | + @classmethod |
| 119 | + async def openid_from_response(cls, response: dict) -> OpenID: |
| 120 | + """Return {OpenID} object from provider's user info endpoint response""" |
| 121 | + raise NotImplementedError(f"Provider {cls.provider} not supported") |
| 122 | + |
| 123 | + async def get_discovery_document(self) -> DiscoveryDocument: |
| 124 | + """Get discovery document containing handy urls""" |
| 125 | + raise NotImplementedError(f"Provider {self.provider} not supported") |
| 126 | + |
| 127 | + @property |
| 128 | + async def authorization_endpoint(self) -> Optional[str]: |
| 129 | + """Return `authorization_endpoint` from discovery document""" |
| 130 | + discovery = await self.get_discovery_document() |
| 131 | + return discovery.get("authorization_endpoint") |
| 132 | + |
| 133 | + @property |
| 134 | + async def token_endpoint(self) -> Optional[str]: |
| 135 | + """Return `token_endpoint` from discovery document""" |
| 136 | + discovery = await self.get_discovery_document() |
| 137 | + return discovery.get("token_endpoint") |
| 138 | + |
| 139 | + @property |
| 140 | + async def userinfo_endpoint(self) -> Optional[str]: |
| 141 | + """Return `userinfo_endpoint` from discovery document""" |
| 142 | + discovery = await self.get_discovery_document() |
| 143 | + return discovery.get("userinfo_endpoint") |
| 144 | + |
| 145 | + async def get_login_url( |
| 146 | + self, |
| 147 | + *, |
| 148 | + redirect_uri: Optional[str] = None, |
| 149 | + params: Optional[Dict[str, Any]] = None, |
| 150 | + state: Optional[str] = None, |
| 151 | + ) -> str: |
| 152 | + """Return prepared login url. This is low-level, see {get_login_redirect} instead.""" |
| 153 | + params = params or {} |
| 154 | + redirect_uri = redirect_uri or self.redirect_uri |
| 155 | + if redirect_uri is None: |
| 156 | + raise ValueError("redirect_uri must be provided, either at construction or request time") |
| 157 | + request_uri = self.oauth_client.prepare_request_uri( |
| 158 | + await self.authorization_endpoint, redirect_uri=redirect_uri, state=state, scope=self.scope, **params |
| 159 | + ) |
| 160 | + return request_uri |
| 161 | + |
| 162 | + async def get_login_redirect( |
| 163 | + self, |
| 164 | + *, |
| 165 | + redirect_uri: Optional[str] = None, |
| 166 | + params: Optional[Dict[str, Any]] = None, |
| 167 | + state: Optional[str] = None, |
| 168 | + ) -> RedirectResponse: |
| 169 | + """Return redirect response by Stalette to login page of Oauth SSO provider |
| 170 | +
|
| 171 | + Arguments: |
| 172 | + redirect_uri {Optional[str]} -- Override redirect_uri specified on this instance (default: None) |
| 173 | + params {Optional[Dict[str, Any]]} -- Add additional query parameters to the login request. |
| 174 | + state {Optional[str]} -- Add state parameter. This is useful if you want |
| 175 | + the server to return something specific back to you. |
| 176 | +
|
| 177 | + Returns: |
| 178 | + RedirectResponse -- Starlette response (may directly be returned from FastAPI) |
| 179 | + """ |
| 180 | + login_uri = await self.get_login_url(redirect_uri=redirect_uri, params=params, state=state) |
| 181 | + response = RedirectResponse(login_uri, 303) |
| 182 | + return response |
| 183 | + |
| 184 | + async def verify_and_process( |
| 185 | + self, |
| 186 | + request: Request, |
| 187 | + *, |
| 188 | + params: Optional[Dict[str, Any]] = None, |
| 189 | + headers: Optional[Dict[str, Any]] = None, |
| 190 | + redirect_uri: Optional[str] = None, |
| 191 | + ) -> Optional[OpenID]: |
| 192 | + """Get FastAPI (Starlette) Request object and process login. |
| 193 | + This handler should be used for your /callback path. |
| 194 | +
|
| 195 | + Arguments: |
| 196 | + request {Request} -- FastAPI request object (or Starlette) |
| 197 | + params {Optional[Dict[str, Any]]} -- Optional additional query parameters to pass to the provider |
| 198 | +
|
| 199 | + Returns: |
| 200 | + Optional[OpenID] -- OpenID if the login was successfull |
| 201 | + """ |
| 202 | + headers = headers or {} |
| 203 | + code = request.query_params.get("code") |
| 204 | + if code is None: |
| 205 | + raise SSOLoginError(400, "'code' parameter was not found in callback request") |
| 206 | + self._state = request.query_params.get("state") |
| 207 | + return await self.process_login( |
| 208 | + code, request, params=params, additional_headers=headers, redirect_uri=redirect_uri |
| 209 | + ) |
| 210 | + |
| 211 | + async def process_login( |
| 212 | + self, |
| 213 | + code: str, |
| 214 | + request: Request, |
| 215 | + *, |
| 216 | + params: Optional[Dict[str, Any]] = None, |
| 217 | + additional_headers: Optional[Dict[str, Any]] = None, |
| 218 | + redirect_uri: Optional[str] = None, |
| 219 | + ) -> Optional[OpenID]: |
| 220 | + """This method should be called from callback endpoint to verify the user and request user info endpoint. |
| 221 | + This is low level, you should use {verify_and_process} instead. |
| 222 | +
|
| 223 | + Arguments: |
| 224 | + params {Optional[Dict[str, Any]]} -- Optional additional query parameters to pass to the provider |
| 225 | + additional_headers {Optional[Dict[str, Any]]} -- Optional additional headers to be added to all requests |
| 226 | + """ |
| 227 | + # pylint: disable=too-many-locals |
| 228 | + params = params or {} |
| 229 | + additional_headers = additional_headers or {} |
| 230 | + additional_headers.update(self.additional_headers or {}) |
| 231 | + url = request.url |
| 232 | + scheme = url.scheme |
| 233 | + if not self.allow_insecure_http and scheme != "https": |
| 234 | + current_url = str(url).replace("http://", "https://") |
| 235 | + scheme = "https" |
| 236 | + else: |
| 237 | + current_url = str(url) |
| 238 | + current_path = f"{scheme}://{url.netloc}{url.path}" |
| 239 | + |
| 240 | + token_url, headers, body = self.oauth_client.prepare_token_request( |
| 241 | + await self.token_endpoint, |
| 242 | + authorization_response=current_url, |
| 243 | + redirect_url=redirect_uri or self.redirect_uri or current_path, |
| 244 | + code=code, |
| 245 | + **params, |
| 246 | + ) # type: ignore |
| 247 | + |
| 248 | + if token_url is None: |
| 249 | + return None |
| 250 | + |
| 251 | + headers.update(additional_headers) |
| 252 | + |
| 253 | + auth = httpx.BasicAuth(self.client_id, self.client_secret) |
| 254 | + async with httpx.AsyncClient() as session: |
| 255 | + response = await session.post(token_url, headers=headers, content=body, auth=auth) |
| 256 | + content = response.json() |
| 257 | + self._refresh_token = content.get("refresh_token") |
| 258 | + self.oauth_client.parse_request_body_response(json.dumps(content)) |
| 259 | + |
| 260 | + uri, headers, _ = self.oauth_client.add_token(await self.userinfo_endpoint) |
| 261 | + response = await session.get(uri, headers=headers) |
| 262 | + content = response.json() |
| 263 | + |
| 264 | + return await self.openid_from_response(content) |
0 commit comments