Source code for codegrade._api.otp_login

"""The endpoints for otp_login objects.

SPDX-License-Identifier: AGPL-3.0-only OR BSD-3-Clause-Clear
"""

from __future__ import annotations

import os
import typing as t

import cg_request_args as rqa
from cg_maybe import Maybe, Nothing
from cg_maybe.utils import maybe_from_nullable

from .. import paginated, parsers, utils

if t.TYPE_CHECKING or os.getenv("CG_EAGERIMPORT", False):
    from .. import client
    from ..models.confirm_otp_login_data import ConfirmOTPLoginData
    from ..models.create_otp_login_data import CreateOTPLoginData
    from ..models.otp_login_multi_response import OTPLoginMultiResponse
    from ..models.otp_login_response import OTPLoginResponse
    from ..models.otp_login_single_response import OTPLoginSingleResponse
    from ..models.otp_loginable_user import OTPLoginableUser
    from ..models.select_otp_login_data import SelectOTPLoginData


_ClientT = t.TypeVar("_ClientT", bound="client._BaseClient")


[docs] class OTPLoginService(t.Generic[_ClientT]): __slots__ = ("__client",) def __init__(self, client: _ClientT) -> None: self.__client = client
[docs] def confirm( self, json_body: ConfirmOTPLoginData, *, otp_id: str, ) -> t.Union[OTPLoginSingleResponse, OTPLoginMultiResponse]: """Verify the OTP code. Verifies the code sent to the user's email. If exactly one active user matches, returns a login response with an access token and consumes the session. If multiple users match, returns `{\"type\": \"select\"}` — use `otp_login.list` and `otp_login.select` to choose. If no users match, returns 404 (same as wrong code, anti-enumeration). Allows up to 5 attempts before the session is locked. :param json_body: The body of the request. See :class:`.ConfirmOTPLoginData` for information about the possible fields. You can provide this data as a :class:`.ConfirmOTPLoginData` or as a dictionary. :param otp_id: The OTP session identifier. :returns: Either a LoginResponse or a selection marker. """ url = "/api/v1/otp_logins/{otpId}/confirm".format(otpId=otp_id) params = None with self.__client as client: resp = client.http.post( url=url, json=utils.to_dict(json_body), params=params ) utils.log_warnings(resp) if utils.response_code_matches(resp.status_code, 200): from ..models.otp_login_multi_response import OTPLoginMultiResponse from ..models.otp_login_single_response import ( OTPLoginSingleResponse, ) return parsers.JsonResponseParser( parsers.make_union( parsers.ParserFor.make(OTPLoginSingleResponse), parsers.ParserFor.make(OTPLoginMultiResponse), ) ).try_parse(resp) from ..models.any_error import AnyError raise utils.get_error( resp, ( ( (400, 409, 401, 403, 404, 429, 500), utils.unpack_union(AnyError), ), ), )
[docs] def create( self, json_body: CreateOTPLoginData, ) -> OTPLoginResponse: """Create an OTP login session and send a code to the given email. First step of the OTP login flow. After creating, call `otp_login.confirm` with the code from the email. For single-user matches that returns a login response directly. For multi-user matches, use `otp_login.list` and `otp_login.select`. The session is bound to the creating client's IP. A valid proof-of-work solution is required to prevent automated abuse. Always returns success regardless of whether matching users exist (anti-enumeration). :param json_body: The body of the request. See :class:`.CreateOTPLoginData` for information about the possible fields. You can provide this data as a :class:`.CreateOTPLoginData` or as a dictionary. :returns: An OTP session identifier. """ url = "/api/v1/otp_logins" params = None with self.__client as client: resp = client.http.post( url=url, json=utils.to_dict(json_body), params=params ) utils.log_warnings(resp) if utils.response_code_matches(resp.status_code, 200): from ..models.otp_login_response import OTPLoginResponse return parsers.JsonResponseParser( parsers.ParserFor.make(OTPLoginResponse) ).try_parse(resp) from ..models.any_error import AnyError raise utils.get_error( resp, ( ( (400, 409, 401, 403, 404, 429, 500), utils.unpack_union(AnyError), ), ), )
[docs] def list( self, *, otp_id: str, page_size: int = 20, cg_otp_login_code: str, ) -> paginated.Response[OTPLoginableUser]: """List users matching this OTP session (paginated). Only available after `otp_login.confirm` returned `{\"type\": \"select\"}`. Requires the OTP code in the `Cg-Otp-Login-Code` header for authentication. Does not consume the session — can be called repeatedly until the session expires. Each user includes a `can_login` flag indicating whether they are allowed to log in directly (users managed by an external system like LTI/SSO may not be). :param otp_id: The OTP session identifier. :param page_size: The size of a single page, maximum is 50. :returns: Paginated list of matching users. """ url = "/api/v1/otp_logins/{otpId}/users/".format(otpId=otp_id) headers: t.Dict[str, str] = { "cg-otp-login-code": str(cg_otp_login_code), } params: t.Dict[str, str | int | bool] = { "page-size": page_size, } if t.TYPE_CHECKING: import httpx def do_request(next_token: str | None) -> httpx.Response: if next_token is None: params.pop("next-token", "") else: params["next-token"] = next_token with self.__client as client: resp = client.http.get(url=url, headers=headers, params=params) utils.log_warnings(resp) return resp def parse_response( resp: httpx.Response, ) -> t.Sequence[OTPLoginableUser]: if utils.response_code_matches(resp.status_code, 200): from ..models.otp_loginable_user import OTPLoginableUser return parsers.JsonResponseParser( rqa.List(parsers.ParserFor.make(OTPLoginableUser)) ).try_parse(resp) from ..models.any_error import AnyError raise utils.get_error( resp, ( ( (400, 409, 401, 403, 404, 429, 500), utils.unpack_union(AnyError), ), ), ) return paginated.Response(do_request, parse_response)
[docs] def select( self, json_body: SelectOTPLoginData, *, otp_id: str, ) -> OTPLoginSingleResponse: """Select a user from the multi-user list and log in. Only available after `otp_login.confirm` returned `{\"type\": \"select\"}`. Requires the OTP code for re-authentication. Consumes the session — a second call will return 404. :param json_body: The body of the request. See :class:`.SelectOTPLoginData` for information about the possible fields. You can provide this data as a :class:`.SelectOTPLoginData` or as a dictionary. :param otp_id: The OTP session identifier. :returns: A login response for the selected user. """ url = "/api/v1/otp_logins/{otpId}/select".format(otpId=otp_id) params = None with self.__client as client: resp = client.http.post( url=url, json=utils.to_dict(json_body), params=params ) utils.log_warnings(resp) if utils.response_code_matches(resp.status_code, 200): from ..models.otp_login_single_response import ( OTPLoginSingleResponse, ) return parsers.JsonResponseParser( parsers.ParserFor.make(OTPLoginSingleResponse) ).try_parse(resp) from ..models.any_error import AnyError raise utils.get_error( resp, ( ( (400, 409, 401, 403, 404, 429, 500), utils.unpack_union(AnyError), ), ), )