mirror of
https://github.com/RYGhub/royalnet.git
synced 2024-11-23 19:44:20 +00:00
Change ApiStars (Start 5.9.0)
This commit is contained in:
parent
c80ce08299
commit
c86504836a
15 changed files with 150 additions and 73 deletions
|
@ -5,7 +5,7 @@
|
|||
|
||||
[tool.poetry]
|
||||
name = "royalnet"
|
||||
version = "5.8.16"
|
||||
version = "5.9.0"
|
||||
description = "A multipurpose bot and web framework"
|
||||
authors = ["Stefano Pigozzi <ste.pigozzi@gmail.com>"]
|
||||
license = "AGPL-3.0+"
|
||||
|
|
|
@ -11,16 +11,22 @@ class ApiLoginRoyalnetStar(ApiStar):
|
|||
|
||||
methods = ["POST"]
|
||||
|
||||
summary = "Login as a Royalnet user, creating a 7-day login token."
|
||||
|
||||
parameters = {
|
||||
"post": {
|
||||
"username": "The name of the user you are logging in as.",
|
||||
"password": "The password of the user you are logging in as."
|
||||
"password": "The password of the user you are logging in as.",
|
||||
}
|
||||
}
|
||||
|
||||
tags = ["login"]
|
||||
|
||||
async def api(self, data: ApiData) -> ru.JSON:
|
||||
async def post(self, data: ApiData) -> ru.JSON:
|
||||
"""Login with a Royalnet account.
|
||||
|
||||
The method returns a API token valid for 7 days that identifies and authenticates the user to the API.
|
||||
|
||||
Keep it secret, as it is basically a password!
|
||||
"""
|
||||
TokenT = self.alchemy.get(Token)
|
||||
UserT = self.alchemy.get(User)
|
||||
|
||||
|
|
|
@ -6,11 +6,12 @@ import royalnet.utils as ru
|
|||
class ApiRoyalnetVersionStar(ApiStar):
|
||||
path = "/api/royalnet/version/v1"
|
||||
|
||||
summary = "Get the current Royalnet version."
|
||||
methods = ["GET"]
|
||||
|
||||
tags = ["royalnet"]
|
||||
|
||||
async def api(self, data: ApiData) -> ru.JSON:
|
||||
async def get(self, data: ApiData) -> ru.JSON:
|
||||
"""Get the current Royalnet version."""
|
||||
return {
|
||||
"semantic": rv.semantic
|
||||
}
|
||||
|
|
|
@ -10,17 +10,18 @@ class ApiTokenCreateStar(ApiStar):
|
|||
|
||||
methods = ["POST"]
|
||||
|
||||
summary = "Create a new login token of any duration."
|
||||
|
||||
parameters = {
|
||||
"post": {
|
||||
"duration": "The duration in seconds of the new token."
|
||||
}
|
||||
}
|
||||
|
||||
tags = ["token"]
|
||||
tags = ["login"]
|
||||
|
||||
requires_auth = True
|
||||
async def post(self, data: ApiData) -> ru.JSON:
|
||||
"""Create a new login token for the authenticated user.
|
||||
|
||||
async def api(self, data: ApiData) -> ru.JSON:
|
||||
Keep it secret, as it is basically a password!"""
|
||||
user = await data.user()
|
||||
try:
|
||||
duration = int(data["duration"])
|
||||
|
|
|
@ -5,12 +5,9 @@ from royalnet.constellation.api import *
|
|||
class ApiTokenInfoStar(ApiStar):
|
||||
path = "/api/token/info/v1"
|
||||
|
||||
summary = "Get info the current login token."
|
||||
tags = ["login"]
|
||||
|
||||
tags = ["token"]
|
||||
|
||||
requires_auth = True
|
||||
|
||||
async def api(self, data: ApiData) -> ru.JSON:
|
||||
async def get(self, data: ApiData) -> ru.JSON:
|
||||
"""Get information about the current login token."""
|
||||
token = await data.token()
|
||||
return token.json()
|
||||
|
|
|
@ -11,17 +11,18 @@ class ApiTokenPasswdStar(ApiStar):
|
|||
|
||||
methods = ["PUT"]
|
||||
|
||||
summary = "Change Royalnet password for an user."
|
||||
|
||||
tags = ["token"]
|
||||
|
||||
parameters = {
|
||||
"put": {
|
||||
"new_password": "The password you want to set."
|
||||
}
|
||||
}
|
||||
|
||||
requires_auth = True
|
||||
|
||||
async def api(self, data: ApiData) -> ru.JSON:
|
||||
async def put(self, data: ApiData) -> ru.JSON:
|
||||
"""Change the password of the currently logged in user."""
|
||||
TokenT = self.alchemy.get(Token)
|
||||
token = await data.token()
|
||||
user = token.user
|
||||
|
|
|
@ -6,15 +6,16 @@ from royalnet.constellation.api import *
|
|||
class ApiUserFindStar(ApiStar):
|
||||
path = "/api/user/find/v1"
|
||||
|
||||
summary = "Find a Royalnet user by one of their aliases."
|
||||
|
||||
tags = ["user"]
|
||||
|
||||
parameters = {
|
||||
"get": {
|
||||
"alias": "One of the aliases of the user to get."
|
||||
}
|
||||
}
|
||||
|
||||
async def api(self, data: ApiData) -> dict:
|
||||
async def get(self, data: ApiData) -> dict:
|
||||
"""Get details about the Royalnet user with a certain alias."""
|
||||
user = await User.find(self.alchemy, data.session, data["alias"])
|
||||
if user is None:
|
||||
raise NotFoundError("No such user.")
|
||||
|
|
|
@ -6,15 +6,16 @@ from royalnet.constellation.api import *
|
|||
class ApiUserGetStar(ApiStar):
|
||||
path = "/api/user/get/v1"
|
||||
|
||||
summary = "Get a Royalnet user by its id."
|
||||
|
||||
parameters = {
|
||||
"get": {
|
||||
"id": "The id of the user to get."
|
||||
}
|
||||
}
|
||||
|
||||
tags = ["user"]
|
||||
|
||||
async def api(self, data: ApiData) -> dict:
|
||||
async def get(self, data: ApiData) -> dict:
|
||||
"""Get details about the Royalnet user with a certain id."""
|
||||
user_id_str = data["id"]
|
||||
try:
|
||||
user_id = int(user_id_str)
|
||||
|
|
|
@ -7,10 +7,9 @@ from royalnet.constellation.api import *
|
|||
class ApiUserListStar(ApiStar):
|
||||
path = "/api/user/list/v1"
|
||||
|
||||
summary = "Get a list of all registered users."
|
||||
|
||||
tags = ["user"]
|
||||
|
||||
async def api(self, data: ApiData) -> JSON:
|
||||
async def get(self, data: ApiData) -> JSON:
|
||||
"Get a list of all Royalnet users."
|
||||
users: typing.List[User] = await asyncify(data.session.query(self.alchemy.get(User)).all)
|
||||
return [user.json() for user in users]
|
||||
|
|
|
@ -10,11 +10,10 @@ log = logging.getLogger(__name__)
|
|||
|
||||
|
||||
class ApiData(dict):
|
||||
def __init__(self, data, star, method):
|
||||
def __init__(self, data, star):
|
||||
super().__init__(data)
|
||||
self.star = star
|
||||
self._session = None
|
||||
self.method = method
|
||||
|
||||
def __missing__(self, key):
|
||||
raise MissingParameterError(f"Missing '{key}'")
|
||||
|
|
|
@ -8,18 +8,18 @@ from .jsonapi import api_error, api_success
|
|||
from .apidata import ApiData
|
||||
from .apierrors import *
|
||||
import royalnet.utils as ru
|
||||
import logging
|
||||
import re
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ApiStar(PageStar, ABC):
|
||||
summary: str = ""
|
||||
|
||||
description: str = ""
|
||||
|
||||
parameters: Dict[str, str] = {}
|
||||
parameters: Dict[str, Dict[str, str]] = {}
|
||||
|
||||
tags: List[str] = []
|
||||
|
||||
requires_auth: bool = False
|
||||
deprecated: bool = False
|
||||
|
||||
async def page(self, request: Request) -> JSONResponse:
|
||||
if request.query_params:
|
||||
|
@ -29,9 +29,21 @@ class ApiStar(PageStar, ABC):
|
|||
data = await request.json()
|
||||
except JSONDecodeError:
|
||||
data = {}
|
||||
apidata = ApiData(data=data, star=self, method=request.method)
|
||||
apidata = ApiData(data=data, star=self)
|
||||
|
||||
method = request.method.lower()
|
||||
|
||||
try:
|
||||
response = await self.api(apidata)
|
||||
if method == "get":
|
||||
response = await self.get(apidata)
|
||||
elif method == "post":
|
||||
response = await self.post(apidata)
|
||||
elif method == "put":
|
||||
response = await self.put(apidata)
|
||||
elif method == "delete":
|
||||
response = await self.delete(apidata)
|
||||
else:
|
||||
raise MethodNotImplementedError("Unknown method")
|
||||
except UnauthorizedError as e:
|
||||
return api_error(e, code=401)
|
||||
except NotFoundError as e:
|
||||
|
@ -50,34 +62,70 @@ class ApiStar(PageStar, ABC):
|
|||
finally:
|
||||
await apidata.session_close()
|
||||
|
||||
async def api(self, data: ApiData) -> ru.JSON:
|
||||
async def get(self, data: ApiData) -> ru.JSON:
|
||||
raise MethodNotImplementedError()
|
||||
|
||||
@classmethod
|
||||
def swagger(cls) -> ru.JSON:
|
||||
async def post(self, data: ApiData) -> ru.JSON:
|
||||
raise MethodNotImplementedError()
|
||||
|
||||
async def put(self, data: ApiData) -> ru.JSON:
|
||||
raise MethodNotImplementedError()
|
||||
|
||||
async def delete(self, data: ApiData) -> ru.JSON:
|
||||
raise MethodNotImplementedError()
|
||||
|
||||
def __swagger_for_a_method(self, method: Callable) -> ru.JSON:
|
||||
docstring = method.__doc__ or ""
|
||||
if docstring is None:
|
||||
log.error("Python was started with -OO, so docstrings are disabled and a summary can't be generated.")
|
||||
summary = ""
|
||||
description = ""
|
||||
else:
|
||||
summary, description = re.match(r"^(.*)(?:\n{2,}((?:.|\n)*))?", docstring).groups()
|
||||
|
||||
return {
|
||||
"operationId": f"{self.__class__.__name__}_{method.__name__}",
|
||||
"summary": ru.strip_tabs(summary) if summary is not None else None,
|
||||
"description": ru.strip_tabs(description) if description is not None else None,
|
||||
"tags": self.tags,
|
||||
"security": [{"RoyalnetLoginToken": ["logged_in"]}],
|
||||
"parameters": [{
|
||||
"name": parameter_name,
|
||||
"in": "query",
|
||||
"description": ru.strip_tabs(self.parameters[method.__name__][parameter_name]),
|
||||
"type": "string",
|
||||
} for parameter_name in self.parameters.get(method.__name__, [])]
|
||||
}
|
||||
|
||||
def swagger(self) -> ru.JSON:
|
||||
"""Generate one or more swagger paths for this ApiStar."""
|
||||
result = {}
|
||||
for method in cls.methods:
|
||||
result[method.lower()] = {
|
||||
"operationId": cls.__name__,
|
||||
"summary": cls.summary,
|
||||
"description": cls.description,
|
||||
"responses": {
|
||||
"200": {"description": "Success"},
|
||||
"400": {"description": "Bad request"},
|
||||
"403": {"description": "Forbidden"},
|
||||
"404": {"description": "Not found"},
|
||||
"500": {"description": "Serverside unhandled exception"},
|
||||
"501": {"description": "Not yet implemented"}
|
||||
},
|
||||
"tags": cls.tags,
|
||||
"parameters": [{
|
||||
"name": parameter,
|
||||
"in": "query",
|
||||
"description": cls.parameters[parameter],
|
||||
"type": "string"
|
||||
} for parameter in cls.parameters]
|
||||
}
|
||||
if cls.requires_auth:
|
||||
result[method.lower()]["security"] = [{"RoyalnetLoginToken": ["logged_in"]}]
|
||||
for method in self.methods:
|
||||
result[method.lower()] = self.__swagger_for_a_method(self.__getattribute__(method.lower()))
|
||||
return result
|
||||
|
||||
# result = {}
|
||||
# for method in cls.methods:
|
||||
# result[method.lower()] = {
|
||||
# "operationId": cls.__name__,
|
||||
# "summary": cls.summary,
|
||||
# "description": cls.description,
|
||||
# "responses": {
|
||||
# "200": {"description": "Success"},
|
||||
# "400": {"description": "Bad request"},
|
||||
# "403": {"description": "Forbidden"},
|
||||
# "404": {"description": "Not found"},
|
||||
# "500": {"description": "Serverside unhandled exception"},
|
||||
# "501": {"description": "Not yet implemented"}
|
||||
# },
|
||||
# "tags": cls.tags,
|
||||
# "parameters": [{
|
||||
# "name": parameter,
|
||||
# "in": "query",
|
||||
# "description": cls.parameters[parameter],
|
||||
# "type": "string"
|
||||
# } for parameter in cls.parameters]
|
||||
# }
|
||||
# if cls.requires_auth:
|
||||
# result[method.lower()]["security"] = [{"RoyalnetLoginToken": ["logged_in"]}]
|
||||
# return result
|
||||
|
|
|
@ -132,7 +132,6 @@ class Constellation:
|
|||
else:
|
||||
self.register_page_stars(page_stars, pack_cfg)
|
||||
log.info(f"PageStars: {len(self.starlette.routes)} stars")
|
||||
log.info(f"ExceptionStars: {len(self.starlette.exception_handlers)} stars")
|
||||
|
||||
self.running: bool = False
|
||||
"""Is the :class:`Constellation` server currently running?"""
|
||||
|
|
|
@ -6,6 +6,7 @@ from .multilock import MultiLock
|
|||
from .sentry import init_sentry, sentry_exc, sentry_wrap, sentry_async_wrap
|
||||
from .log import init_logging
|
||||
from .royaltyping import JSON
|
||||
from .strip_tabs import strip_tabs
|
||||
|
||||
__all__ = [
|
||||
"asyncify",
|
||||
|
@ -24,4 +25,5 @@ __all__ = [
|
|||
"sentry_async_wrap",
|
||||
"init_logging",
|
||||
"JSON",
|
||||
"strip_tabs",
|
||||
]
|
||||
|
|
22
royalnet/utils/strip_tabs.py
Normal file
22
royalnet/utils/strip_tabs.py
Normal file
|
@ -0,0 +1,22 @@
|
|||
import re
|
||||
|
||||
|
||||
def strip_tabs(s: str) -> str:
|
||||
# https://github.com/Steffo99/bluelib/blob/pls-work/src/utils/stripTabs.js
|
||||
|
||||
indent_regex = re.compile(r"^[ \t]+")
|
||||
|
||||
rows = list(filter(lambda r: r != "", s.split("\n")))
|
||||
|
||||
match = None
|
||||
for row in rows:
|
||||
match = re.match(indent_regex, row)
|
||||
if match is not None:
|
||||
break
|
||||
|
||||
if match is None:
|
||||
start = 0
|
||||
else:
|
||||
start = len(match.group(0))
|
||||
|
||||
return "\n".join(map(lambda r: r[start:], rows))
|
|
@ -1 +1 @@
|
|||
semantic = "5.8.16"
|
||||
semantic = "5.9.0"
|
||||
|
|
Loading…
Reference in a new issue