remove swagger and add auto-docs for api endpoints in /doc

This commit is contained in:
Izalia Mae 2024-10-13 20:37:37 -04:00
parent a25df0ccc4
commit 71e1503542
9 changed files with 462 additions and 1138 deletions

View file

@ -21,6 +21,7 @@ dependencies = [
"barkshark-lib >= 0.2.3, < 0.3.0",
"barkshark-sql >= 0.2.0, < 0.3.0",
"click == 8.1.2",
"docstring-parser == 0.16",
"hiredis == 2.3.2",
"idna == 3.4",
"jinja2-haml == 0.3.5",

View file

@ -9,7 +9,6 @@ import traceback
from Crypto.Random import get_random_bytes
from aiohttp import web
from aiohttp.web import HTTPException, StaticResource
from aiohttp_swagger import setup_swagger
from aputils.signer import Signer
from base64 import b64encode
from blib import File, HttpError, port_check
@ -29,8 +28,6 @@ from .database.schema import Instance
from .http_client import HttpClient
from .misc import JSON_PATHS, TOKEN_PATHS, Message, Response
from .template import Template
from .views import ROUTES
from .views.frontend import handle_frontend_path
from .workers import PushWorkers
@ -82,15 +79,6 @@ class Application(web.Application):
self.cache.setup()
self.on_cleanup.append(handle_cleanup) # type: ignore
for method, path, handler in ROUTES:
self.router.add_route(method, path, handler)
setup_swagger(
self,
ui_version = 3,
swagger_from_file = File.from_resource('relay', 'data/swagger.yaml')
)
@property
def cache(self) -> Cache:
@ -312,7 +300,7 @@ async def handle_response_headers(
app: Application = request.app # type: ignore[assignment]
if request.path == "/" or request.path.startswith(TOKEN_PATHS):
if request.path in {"/", "/docs"} or request.path.startswith(TOKEN_PATHS):
with app.database.session() as conn:
tokens = (
request.headers.get('Authorization', '').replace('Bearer', '').strip(),
@ -369,6 +357,34 @@ async def handle_response_headers(
return resp
@web.middleware
async def handle_frontend_path(
request: web.Request,
handler: Callable[[web.Request], Awaitable[Response]]) -> Response:
if request['user'] is not None and request.path == '/login':
return Response.new_redir('/')
if request.path.startswith(TOKEN_PATHS[:2]) and request['user'] is None:
if request.path == '/logout':
return Response.new_redir('/')
response = Response.new_redir(f'/login?redir={request.path}')
if request['token'] is not None:
response.del_cookie('user-token')
return response
response = await handler(request)
if not request.path.startswith('/api'):
if request['user'] is None and request['token'] is not None:
response.del_cookie('user-token')
return response
async def handle_cleanup(app: Application) -> None:
await app.client.close()
app.cache.close()

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,23 @@
-extends "base.haml"
-set page = "API Documentation"
-block content
-for method in methods
.method.section
%a.endpoint(id="{{method.name.replace('handle_', '')}}" href="#{{method.name.replace('handle_', '')}}")
{{method.method.upper()}} {{method.path}}
%span.description -> =method.docs
-if method.parameters
.parameters
-for param in method.parameters
.parameter.section
.name
%pre << {{param.key}}: {{param.type_str}}
-if not param.has_default
%span.required << *required
.doc -> =param.docs

View file

@ -379,6 +379,56 @@ textarea {
margin: var(--spacing) 0;
}
#content.page-api_documentation .method {
padding: 0px;
}
#content.page-api_documentation .method > * {
padding: 5px;
}
#content.page-api_documentation .method .endpoint {
background-color: #444444;
display: block;
font-weight: bold;
}
#content.page-api_documentation .method .endpoint::after {
content: "\F470";
font-family: "bootstrap-icons";
/* margin-left: 5px; */
vertical-align: middle;
}
#content.page-api_documentation .parameter {
background-color: #222222;
padding: 5px;
}
#content.page-api_documentation .parameter:not(:first-child) {
margin-top: 5px;
margin-bottom: 0px;
}
#content.page-api_documentation .parameter:not(:last-child) {
margin-top: 0px;
margin-bottom: 5px;
}
#content.page-api_documentation .parameter .required {
color: #F66;
font-size: 0.75em;
}
#content.page-api_documentation .parameter .name pre {
color: #6F6;
display: inline-block;
}
#content.page-api_documentation .parameter .name pre, #content.page-api_documentation .parameter .name span {
margin: 0px;
}
@keyframes show_toast {
0% {

View file

@ -19,6 +19,7 @@ from .compat import RelayConfig, RelayDatabase
from .config import Config
from .database import RELAY_SOFTWARE, get_database, schema
from .misc import ACTOR_FORMATS, SOFTWARE, IS_DOCKER, Message
from .views import ROUTES
def check_alphanumeric(text: str) -> str:
@ -179,6 +180,9 @@ def cli_run(ctx: click.Context, dev: bool = False) -> None:
cli_setup.callback() # type: ignore
return
for method, path, handler in ROUTES:
ctx.obj.router.add_route(method, path, handler)
ctx.obj['dev'] = dev
ctx.obj.run()

View file

@ -5,18 +5,16 @@ import traceback
from aiohttp.web import Request
from argon2.exceptions import VerifyMismatchError
from blib import HttpError, HttpMethod, convert_to_boolean
from typing import TYPE_CHECKING, Any
from typing import Any
from urllib.parse import urlparse
from .base import DEFAULT_REDIRECT, Route
from .. import api_objects as objects, __version__
from ..application import Application
from ..database import ConfigData, schema
from ..misc import Message, Response, idna_to_utf
if TYPE_CHECKING:
from ..application import Application
@Route(HttpMethod.GET, "/oauth/authorize", "Authorization", False)
async def handle_authorize_get(
@ -25,6 +23,13 @@ async def handle_authorize_get(
response_type: str,
client_id: str,
redirect_uri: str) -> Response:
"""
Authorize an application. Redirects to the application's redirect URI if accepted.
:param response_type: What to respond with. Should always be set to ``code``.
:param client_id: Application identifier
:param redirect_uri: URI to redirect to on accept
"""
if response_type != "code":
raise HttpError(400, "Response type is not 'code'")
@ -82,7 +87,7 @@ async def handle_authorize_post(
return Response.new_redir("/")
@Route(HttpMethod.POST, "/oauth/token", "Auth", False)
@Route(HttpMethod.POST, "/oauth/token", "Authorization", False)
async def handle_new_token(
app: Application,
request: Request,
@ -91,6 +96,15 @@ async def handle_new_token(
client_id: str,
client_secret: str,
redirect_uri: str) -> objects.Application:
"""
Get a new access token for an application
:param grant_type: Access level for the application. Should be ``authorization_code``
:param code: Authorization code obtained from ``/oauth/authorize``
:param client_id: The application to create the token for
:param client_secret: Secret of the specified application
:param redirect_uri: URI to redirect to
"""
if grant_type != "authorization_code":
raise HttpError(400, "Invalid grant type")
@ -110,13 +124,20 @@ async def handle_new_token(
return objects.Application.from_row(application)
@Route(HttpMethod.POST, "/api/oauth/revoke", "Auth", True)
@Route(HttpMethod.POST, "/api/oauth/revoke", "Authorization", True)
async def handle_token_revoke(
app: Application,
request: Request,
client_id: str,
client_secret: str,
token: str) -> objects.Message:
"""
Revoke and destroy a token
:param client_id: Identifier of the application to revoke
:param client_secret: Secret of the application
:param token: Token associated with the application
"""
with app.database.session(True) as conn:
if (application := conn.get_app(client_id, client_secret, token)) is None:
@ -131,12 +152,20 @@ async def handle_token_revoke(
return objects.Message("Token deleted")
@Route(HttpMethod.POST, "/api/v1/login", "Auth", False)
@Route(HttpMethod.POST, "/api/v1/login", "Authorization", False)
async def handle_login(
app: Application,
request: Request,
username: str,
password: str) -> objects.Application:
"""
Create a new token via username and password.
It is recommended to use oauth instead.
:param username: Name of the user to login
:param password: Password of the user
"""
with app.database.session(True) as s:
if not (user := s.get_user(username)):
@ -155,6 +184,8 @@ async def handle_login(
@Route(HttpMethod.GET, "/api/v1/app", "Application", True)
async def handle_get_app(app: Application, request: Request) -> objects.Application:
"Get data for the application currently in use"
return objects.Application.from_row(request["application"])
@ -165,6 +196,13 @@ async def handle_create_app(
name: str,
redirect_uri: str,
website: str | None = None) -> objects.Application:
"""
Create a new application
:param name: User-readable name of the application
:param redirect_uri: URI to redirect to on authorization
:param website: Homepage of the application
"""
with app.database.session(True) as conn:
application = conn.put_app(
@ -178,12 +216,16 @@ async def handle_create_app(
@Route(HttpMethod.GET, "/api/v1/config", "Config", True)
async def handle_config_get(app: Application, request: Request) -> objects.Config:
"Get all config options"
with app.database.session(False) as conn:
return objects.Config.from_config(conn.get_config_all())
@Route(HttpMethod.GET, "/api/v2/config", "Config", True)
async def handle_config_get_v2(app: Application, request: Request) -> list[objects.ConfigItem]:
"Get all config options including the type name for each"
data: list[objects.ConfigItem] = []
cfg = ConfigData()
user_keys = ConfigData.USER_KEYS()
@ -203,7 +245,14 @@ async def handle_config_get_v2(app: Application, request: Request) -> list[objec
async def handle_config_update(
app: Application,
request: Request,
key: str, value: Any) -> objects.Message:
key: str,
value: Any) -> objects.Message:
"""
Set a value for a config option
:param key: Name of the config option to set
:param value: New value
"""
if (field := ConfigData.FIELD(key)).name not in ConfigData.USER_KEYS():
raise HttpError(400, "Invalid key")
@ -219,6 +268,12 @@ async def handle_config_update(
@Route(HttpMethod.DELETE, "/api/v1/config", "Config", True)
async def handle_config_reset(app: Application, request: Request, key: str) -> objects.Message:
"""
Set a config option to the default value
:param key: Name of the config option to reset
"""
if (field := ConfigData.FIELD(key)).name not in ConfigData.USER_KEYS():
raise HttpError(400, "Invalid key")
@ -233,6 +288,8 @@ async def handle_config_reset(app: Application, request: Request, key: str) -> o
@Route(HttpMethod.GET, "/api/v1/relay", "Misc", False)
async def get(app: Application, request: Request) -> objects.Relay:
"Get info about the relay instance"
with app.database.session() as s:
config = s.get_config_all()
inboxes = [row.domain for row in s.get_inboxes()]
@ -252,6 +309,8 @@ async def get(app: Application, request: Request) -> objects.Relay:
@Route(HttpMethod.GET, "/api/v1/instance", "Instance", True)
async def handle_instances_get(app: Application, request: Request) -> list[objects.Instance]:
"Get all subscribed instances"
data: list[objects.Instance] = []
with app.database.session(False) as s:
@ -269,6 +328,14 @@ async def handle_instance_add(
inbox: str | None = None,
software: str | None = None,
followid: str | None = None) -> objects.Instance:
"""
Add an instance to the database
:param actor: URL of the instance actor to add. Usually ``https://{domain}/actor``.
:param inbox: URL of the inbox for the instance actor
:param software: Name of the server software as displayed in nodeinfo
:param followid: URL to the ``Follow`` activity
"""
domain = idna_to_utf(urlparse(actor).netloc)
@ -313,6 +380,15 @@ async def handle_instance_update(
inbox: str | None = None,
software: str | None = None,
followid: str | None = None) -> objects.Instance:
"""
Update info for an instance
:param domain: Hostname of the instance to modify
:param actor: URL of the instance actor to add. Usually ``https://{domain}/actor``.
:param inbox: URL of the inbox for the instance actor
:param software: Name of the server software as displayed in nodeinfo
:param followid: URL to the ``Follow`` activity
"""
domain = idna_to_utf(domain)
@ -333,6 +409,12 @@ async def handle_instance_update(
@Route(HttpMethod.DELETE, "/api/v1/instance", "Instance", True)
async def handle_instance_del(app: Application, request: Request, domain: str) -> objects.Message:
"""
Remove an instance from the database
:param domain: Hostname of the instance to remove
"""
domain = idna_to_utf(domain)
with app.database.session(False) as s:
@ -346,6 +428,12 @@ async def handle_instance_del(app: Application, request: Request, domain: str) -
@Route(HttpMethod.GET, "/api/v1/request", "Request", True)
async def handle_requests_get(app: Application, request: Request) -> list[objects.Instance]:
"""
Get all follow requests.
This feature only works when ``Approval Required`` is enabled.
"""
data: list[objects.Instance] = []
with app.database.session(False) as s:
@ -361,6 +449,12 @@ async def handle_request_response(
request: Request,
domain: str,
accept: bool) -> objects.Message:
"""
Approve or reject a follow request
:param domain: Hostname of the instance that requested to follow
:param accept: Accept (``True``) or reject (``False``) the request
"""
try:
with app.database.session(True) as conn:
@ -394,6 +488,8 @@ async def handle_request_response(
@Route(HttpMethod.GET, "/api/v1/domain_ban", "Domain Ban", True)
async def handle_domain_bans_get(app: Application, request: Request) -> list[objects.DomainBan]:
"Get all banned domains"
data: list[objects.DomainBan] = []
with app.database.session(False) as s:
@ -410,6 +506,16 @@ async def handle_domain_ban_add(
domain: str,
note: str | None = None,
reason: str | None = None) -> objects.DomainBan:
"""
Ban a domain.
Banned domains cannot follow the relay. Posts originating from a banned instance will be
ignored in a future update.
:param domain: Hostname to ban
:param note: Additional details about the ban that can only be viewed by admins
:param reason: Publicly viewable details for the ban
"""
with app.database.session(False) as s:
if s.get_domain_ban(domain) is not None:
@ -420,12 +526,19 @@ async def handle_domain_ban_add(
@Route(HttpMethod.PATCH, "/api/v1/domain_ban", "Domain Ban", True)
async def handle_domain_ban(
async def handle_domain_ban_update(
app: Application,
request: Request,
domain: str,
note: str | None = None,
reason: str | None = None) -> objects.DomainBan:
"""
Update a domain ban
:param domain: Hostname to ban
:param note: Additional details about the ban that can only be viewed by admins
:param reason: Publicly viewable details for the ban
"""
with app.database.session(True) as s:
if not any([note, reason]):
@ -438,8 +551,14 @@ async def handle_domain_ban(
return objects.DomainBan.from_row(row)
@Route(HttpMethod.PATCH, "/api/v1/domain_ban", "Domain Ban", True)
@Route(HttpMethod.DELETE, "/api/v1/domain_ban", "Domain Ban", True)
async def handle_domain_unban(app: Application, request: Request, domain: str) -> objects.Message:
"""
Unban a domain
:param domain: Hostname to unban
"""
with app.database.session(True) as s:
if s.get_domain_ban(domain) is None:
raise HttpError(404, "Domain not banned")
@ -451,6 +570,8 @@ async def handle_domain_unban(app: Application, request: Request, domain: str) -
@Route(HttpMethod.GET, "/api/v1/software_ban", "Software Ban", True)
async def handle_software_bans_get(app: Application, request: Request) -> list[objects.SoftwareBan]:
"Get all banned software"
data: list[objects.SoftwareBan] = []
with app.database.session(False) as s:
@ -467,6 +588,13 @@ async def handle_software_ban_add(
name: str,
note: str | None = None,
reason: str | None = None) -> objects.SoftwareBan:
"""
Ban all instanstances that use the specified software
:param name: Nodeinfo name of the software to ban
:param note: Additional details about the ban that can only be viewed by admins
:param reason: Publicly viewable details for the ban
"""
with app.database.session(True) as s:
if s.get_software_ban(name) is not None:
@ -483,6 +611,13 @@ async def handle_software_ban(
name: str,
note: str | None = None,
reason: str | None = None) -> objects.SoftwareBan:
"""
Update a software ban
:param name: Nodeinfo name of the software ban to modify
:param note: Additional details about the ban that can only be viewed by admins
:param reason: Publicly viewable details for the ban
"""
with app.database.session(True) as s:
if not any([note, reason]):
@ -497,6 +632,12 @@ async def handle_software_ban(
@Route(HttpMethod.PATCH, "/api/v1/software_ban", "Software Ban", True)
async def handle_software_unban(app: Application, request: Request, name: str) -> objects.Message:
"""
Unban the specified software
:param name: Nodeinfo name of the software to unban
"""
with app.database.session(True) as s:
if s.get_software_ban(name) is None:
raise HttpError(404, "Software not banned")
@ -506,6 +647,58 @@ async def handle_software_unban(app: Application, request: Request, name: str) -
return objects.Message("Unbanned software")
@Route(HttpMethod.GET, "/api/v1/whitelist", "Whitelist", True)
async def handle_whitelist_get(app: Application, request: Request) -> list[objects.Whitelist]:
"""
Get all currently whitelisted domains
"""
data: list[objects.Whitelist] = []
with app.database.session(False) as s:
for row in s.get_domains_whitelist():
data.append(objects.Whitelist.from_row(row))
return data
@Route(HttpMethod.POST, "/api/v1/whitelist", "Whitelist", True)
async def handle_whitelist_add(
app: Application,
request: Request,
domain: str) -> objects.Whitelist:
"""
Add a domain to the whitelist
:param domain: Hostname to allow
"""
with app.database.session(True) as s:
if s.get_domain_whitelist(domain) is not None:
raise HttpError(400, "Domain already added to whitelist")
row = s.put_domain_whitelist(domain)
return objects.Whitelist.from_row(row)
@Route(HttpMethod.DELETE, "/api/v1/whitelist", "Whitelist", True)
async def handle_whitelist_del(app: Application, request: Request, domain: str) -> objects.Message:
"""
Remove a domain from the whitelist
:param domain: Hostname to remove from the whitelist
"""
with app.database.session(True) as s:
if s.get_domain_whitelist(domain) is None:
raise HttpError(404, "Domain not in whitelist")
s.del_domain_whitelist(domain)
return objects.Message("Removed domain from whitelist")
# remove /api/v1/user endpoints?
@Route(HttpMethod.GET, "/api/v1/user", "User", True)
async def handle_users_get(app: Application, request: Request) -> list[objects.User]:
with app.database.session(False) as s:
@ -518,7 +711,7 @@ async def handle_users_get(app: Application, request: Request) -> list[objects.U
@Route(HttpMethod.POST, "/api/v1/user", "User", True)
async def post(
async def handle_user_add(
app: Application,
request: Request,
username: str,
@ -534,7 +727,7 @@ async def post(
@Route(HttpMethod.PATCH, "/api/v1/user", "User", True)
async def patch(
async def handle_user_update(
app: Application,
request: Request,
username: str,
@ -550,7 +743,7 @@ async def patch(
@Route(HttpMethod.DELETE, "/api/v1/user", "User", True)
async def delete(app: Application, request: Request, username: str) -> objects.Message:
async def handle_user_del(app: Application, request: Request, username: str) -> objects.Message:
with app.database.session(True) as s:
if s.get_user(username) is None:
raise HttpError(404, "User does not exist")
@ -558,39 +751,3 @@ async def delete(app: Application, request: Request, username: str) -> objects.M
s.del_user(username)
return objects.Message("Deleted user")
@Route(HttpMethod.GET, "/api/v1/whitelist", "Whitelist", True)
async def handle_whitelist_get(app: Application, request: Request) -> list[objects.Whitelist]:
data: list[objects.Whitelist] = []
with app.database.session(False) as s:
for row in s.get_domains_whitelist():
data.append(objects.Whitelist.from_row(row))
return data
@Route(HttpMethod.POST, "/api/v1/whitelist", "Whitelist", True)
async def handle_whitelist_add(
app: Application,
request: Request,
domain: str) -> objects.Whitelist:
with app.database.session(True) as s:
if s.get_domain_whitelist(domain) is not None:
raise HttpError(400, "Domain already added to whitelist")
row = s.put_domain_whitelist(domain)
return objects.Whitelist.from_row(row)
@Route(HttpMethod.DELETE, "/api/v1/whitelist", "Whitelist", True)
async def handle_whitelist_del(app: Application, request: Request, domain: str) -> objects.Message:
with app.database.session(True) as s:
if s.get_domain_whitelist(domain) is None:
raise HttpError(404, "Domain not in whitelist")
s.del_domain_whitelist(domain)
return objects.Message("Removed domain from whitelist")

View file

@ -1,17 +1,22 @@
from __future__ import annotations
import docstring_parser
import inspect
from aiohttp.web import Request, StreamResponse
from blib import HttpError, HttpMethod
from collections.abc import Awaitable, Callable, Mapping
from collections.abc import Awaitable, Callable, Sequence
from dataclasses import dataclass
from json.decoder import JSONDecodeError
from typing import TYPE_CHECKING, Any, overload
from types import GenericAlias, UnionType
from typing import TYPE_CHECKING, Any, cast, get_origin, get_type_hints, overload
from .. import logger as logging
from ..api_objects import ApiObject
from ..application import Application
from ..misc import Response, get_app
if TYPE_CHECKING:
from ..application import Application
try:
from typing import Self
@ -23,6 +28,7 @@ if TYPE_CHECKING:
HandlerCallback = Callable[[Request], Awaitable[Response]]
METHODS: dict[str, Method] = {}
ROUTES: list[tuple[str, str, HandlerCallback]] = []
DEFAULT_REDIRECT: str = 'urn:ietf:wg:oauth:2.0:oob'
@ -33,8 +39,23 @@ ALLOWED_HEADERS: set[str] = {
}
def convert_data(data: Mapping[str, Any]) -> dict[str, str]:
return {key: str(value) for key, value in data.items()}
def parse_docstring(docstring: str) -> tuple[str, dict[str, str]]:
params = {}
ds = docstring_parser.parse(docstring)
for param in ds.params:
params[param.arg_name] = param.description or "n/a"
if not ds.short_description and not ds.long_description:
body = "n/a"
elif ds.long_description is None:
body = cast(str, ds.short_description)
else:
body = "\n".join([ds.short_description, ds.long_description]) # type: ignore[list-item]
return body, params
def register_route(
@ -51,8 +72,114 @@ def register_route(
return wrapper
@dataclass(slots = True, frozen = True)
class Method:
name: str
category: str
docs: str | None
method: HttpMethod
path: str
return_type: type[Any]
parameters: tuple[Parameter, ...]
@classmethod
def parse(
cls: type[Self],
func: ApiRouteHandler,
method: HttpMethod,
path: str,
category: str) -> Self:
annotations = get_type_hints(func)
if (return_type := annotations.get("return")) is None:
raise ValueError(f"Missing return type for {func.__name__}")
if isinstance(return_type, GenericAlias):
return_type = get_origin(return_type)
if not issubclass(return_type, (Response, ApiObject, list)):
raise ValueError(f"Invalid return type '{return_type.__name__}' for {func.__name__}")
args = {key: value for key, value in inspect.signature(func).parameters.items()}
docstring, paramdocs = parse_docstring(func.__doc__ or "")
params = []
if func.__doc__ is None:
logging.warning(f"Missing docstring for '{func.__name__}'")
for key, value in args.items():
types: list[type[Any]] = []
vtype = annotations[key]
if isinstance(vtype, UnionType):
for subtype in vtype.__args__:
if subtype is type(None):
continue
types.append(subtype)
elif vtype.__name__ in {"Application", "Request"}:
continue
else:
types.append(vtype)
params.append(Parameter(
key = key,
docs = paramdocs.get(key, ""),
default = value.default,
types = tuple(types)
))
if not paramdocs.get(key):
logging.warning(f"Missing docs for '{key}' parameter in '{func.__name__}'")
rtype = annotations.get("return") or type(None)
return cls(func.__name__, category, docstring, method, path, rtype, tuple(params))
@dataclass(slots = True, frozen = True)
class Parameter:
key: str
docs: str
default: Any
types: tuple[type[Any], ...]
@property
def has_default(self) -> bool:
# why tf do you make me do this mypy!?
return cast(bool, self.default != inspect.Parameter.empty)
@property
def key_str(self) -> str:
if not self.has_default:
return f"{self.key} *required"
return self.key
@property
def type_str(self) -> str:
return " | ".join(v.__name__ for v in self.types)
def check_types(self, items: Sequence[type[Any]]) -> bool:
for item in items:
if isinstance(item, self.types):
return True
return False
class Route:
handler: ApiRouteHandler
docs: Method
def __init__(self,
method: HttpMethod,
@ -82,6 +209,10 @@ class Route:
if isinstance(obj, Request):
return self.handle_request(obj)
if (self.method, self.path) != (HttpMethod.POST, "/oauth/authorize"):
if self.path != "/api/v1/user":
METHODS[obj.__name__] = Method.parse(obj, self.method, self.path, self.category)
self.handler = obj
return self

View file

@ -1,49 +1,20 @@
from __future__ import annotations
from aiohttp.web import Request, middleware
from aiohttp.web import Request
from blib import HttpMethod
from collections.abc import Awaitable, Callable
from typing import TYPE_CHECKING, Any
from urllib.parse import unquote
from .base import register_route
from .base import METHODS, register_route
from ..database import THEMES
from ..logger import LogLevel
from ..misc import TOKEN_PATHS, Response
from ..misc import Response
if TYPE_CHECKING:
from ..application import Application
@middleware
async def handle_frontend_path(
request: Request,
handler: Callable[[Request], Awaitable[Response]]) -> Response:
if request['user'] is not None and request.path == '/login':
return Response.new_redir('/')
if request.path.startswith(TOKEN_PATHS[:2]) and request['user'] is None:
if request.path == '/logout':
return Response.new_redir('/')
response = Response.new_redir(f'/login?redir={request.path}')
if request['token'] is not None:
response.del_cookie('user-token')
return response
response = await handler(request)
if not request.path.startswith('/api'):
if request['user'] is None and request['token'] is not None:
response.del_cookie('user-token')
return response
@register_route(HttpMethod.GET, "/")
async def handle_home(app: Application, request: Request) -> Response:
with app.database.session() as conn:
@ -54,6 +25,15 @@ async def handle_home(app: Application, request: Request) -> Response:
return Response.new_template(200, "page/home.haml", request, context)
@register_route(HttpMethod.GET, "/docs")
async def handle_api_doc(app: Application, request: Request) -> Response:
context: dict[str, Any] = {
"methods": sorted(METHODS.values(), key = lambda x: x.category)
}
return Response.new_template(200, "page/docs.haml", request, context)
@register_route(HttpMethod.GET, '/login')
async def handle_login(app: Application, request: Request) -> Response:
context = {"redir": unquote(request.query.get("redir", "/"))}