"""The endpoints for otp_login objects.
SPDX-License-Identifier: AGPL-3.0-only OR BSD-3-Clause-Clear
"""
from __future__ import annotations
import os
import typing as t
import cg_request_args as rqa
from cg_maybe import Maybe, Nothing
from cg_maybe.utils import maybe_from_nullable
from .. import paginated, parsers, utils
if t.TYPE_CHECKING or os.getenv("CG_EAGERIMPORT", False):
from .. import client
from ..models.confirm_otp_login_data import ConfirmOTPLoginData
from ..models.create_otp_login_data import CreateOTPLoginData
from ..models.otp_login_multi_response import OTPLoginMultiResponse
from ..models.otp_login_response import OTPLoginResponse
from ..models.otp_login_single_response import OTPLoginSingleResponse
from ..models.otp_loginable_user import OTPLoginableUser
from ..models.select_otp_login_data import SelectOTPLoginData
_ClientT = t.TypeVar("_ClientT", bound="client._BaseClient")
[docs]
class OTPLoginService(t.Generic[_ClientT]):
__slots__ = ("__client",)
def __init__(self, client: _ClientT) -> None:
self.__client = client
[docs]
def confirm(
self,
json_body: ConfirmOTPLoginData,
*,
otp_id: str,
) -> t.Union[OTPLoginSingleResponse, OTPLoginMultiResponse]:
"""Verify the OTP code.
Verifies the code sent to the user's email. If exactly one active user
matches, returns a login response with an access token and consumes the
session. If multiple users match, returns `{\"type\": \"select\"}` —
use `otp_login.list` and `otp_login.select` to choose. If no users
match, returns 404 (same as wrong code, anti-enumeration). Allows up to
5 attempts before the session is locked.
:param json_body: The body of the request. See
:class:`.ConfirmOTPLoginData` for information about the possible
fields. You can provide this data as a
:class:`.ConfirmOTPLoginData` or as a dictionary.
:param otp_id: The OTP session identifier.
:returns: Either a LoginResponse or a selection marker.
"""
url = "/api/v1/otp_logins/{otpId}/confirm".format(otpId=otp_id)
params = None
with self.__client as client:
resp = client.http.post(
url=url, json=utils.to_dict(json_body), params=params
)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 200):
from ..models.otp_login_multi_response import OTPLoginMultiResponse
from ..models.otp_login_single_response import (
OTPLoginSingleResponse,
)
return parsers.JsonResponseParser(
parsers.make_union(
parsers.ParserFor.make(OTPLoginSingleResponse),
parsers.ParserFor.make(OTPLoginMultiResponse),
)
).try_parse(resp)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)
[docs]
def create(
self,
json_body: CreateOTPLoginData,
) -> OTPLoginResponse:
"""Create an OTP login session and send a code to the given email.
First step of the OTP login flow. After creating, call
`otp_login.confirm` with the code from the email. For single-user
matches that returns a login response directly. For multi-user matches,
use `otp_login.list` and `otp_login.select`.
The session is bound to the creating client's IP. A valid proof-of-work
solution is required to prevent automated abuse. Always returns success
regardless of whether matching users exist (anti-enumeration).
:param json_body: The body of the request. See
:class:`.CreateOTPLoginData` for information about the possible
fields. You can provide this data as a :class:`.CreateOTPLoginData`
or as a dictionary.
:returns: An OTP session identifier.
"""
url = "/api/v1/otp_logins"
params = None
with self.__client as client:
resp = client.http.post(
url=url, json=utils.to_dict(json_body), params=params
)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 200):
from ..models.otp_login_response import OTPLoginResponse
return parsers.JsonResponseParser(
parsers.ParserFor.make(OTPLoginResponse)
).try_parse(resp)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)
[docs]
def list(
self,
*,
otp_id: str,
page_size: int = 20,
cg_otp_login_code: str,
) -> paginated.Response[OTPLoginableUser]:
"""List users matching this OTP session (paginated).
Only available after `otp_login.confirm` returned `{\"type\":
\"select\"}`. Requires the OTP code in the `Cg-Otp-Login-Code` header
for authentication. Does not consume the session — can be called
repeatedly until the session expires.
Each user includes a `can_login` flag indicating whether they are
allowed to log in directly (users managed by an external system like
LTI/SSO may not be).
:param otp_id: The OTP session identifier.
:param page_size: The size of a single page, maximum is 50.
:returns: Paginated list of matching users.
"""
url = "/api/v1/otp_logins/{otpId}/users/".format(otpId=otp_id)
headers: t.Dict[str, str] = {
"cg-otp-login-code": str(cg_otp_login_code),
}
params: t.Dict[str, str | int | bool] = {
"page-size": page_size,
}
if t.TYPE_CHECKING:
import httpx
def do_request(next_token: str | None) -> httpx.Response:
if next_token is None:
params.pop("next-token", "")
else:
params["next-token"] = next_token
with self.__client as client:
resp = client.http.get(url=url, headers=headers, params=params)
utils.log_warnings(resp)
return resp
def parse_response(
resp: httpx.Response,
) -> t.Sequence[OTPLoginableUser]:
if utils.response_code_matches(resp.status_code, 200):
from ..models.otp_loginable_user import OTPLoginableUser
return parsers.JsonResponseParser(
rqa.List(parsers.ParserFor.make(OTPLoginableUser))
).try_parse(resp)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)
return paginated.Response(do_request, parse_response)
[docs]
def select(
self,
json_body: SelectOTPLoginData,
*,
otp_id: str,
) -> OTPLoginSingleResponse:
"""Select a user from the multi-user list and log in.
Only available after `otp_login.confirm` returned `{\"type\":
\"select\"}`. Requires the OTP code for re-authentication. Consumes the
session — a second call will return 404.
:param json_body: The body of the request. See
:class:`.SelectOTPLoginData` for information about the possible
fields. You can provide this data as a :class:`.SelectOTPLoginData`
or as a dictionary.
:param otp_id: The OTP session identifier.
:returns: A login response for the selected user.
"""
url = "/api/v1/otp_logins/{otpId}/select".format(otpId=otp_id)
params = None
with self.__client as client:
resp = client.http.post(
url=url, json=utils.to_dict(json_body), params=params
)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 200):
from ..models.otp_login_single_response import (
OTPLoginSingleResponse,
)
return parsers.JsonResponseParser(
parsers.ParserFor.make(OTPLoginSingleResponse)
).try_parse(resp)
from ..models.any_error import AnyError
raise utils.get_error(
resp,
(
(
(400, 409, 401, 403, 404, 429, 500),
utils.unpack_union(AnyError),
),
),
)