replace pathlib.Path with blib.File

This commit is contained in:
Izalia Mae 2025-02-16 20:03:02 -05:00
parent c2fd96b758
commit fd680dfd0b
10 changed files with 63 additions and 99 deletions

View file

@ -10,12 +10,11 @@ from aiohttp import web
from aiohttp.web import HTTPException, StaticResource
from aputils.signer import Signer
from base64 import b64encode
from blib import File, HttpError, port_check
from blib import File, HttpError, Path, port_check
from bsql import Database
from collections.abc import Awaitable, Callable
from datetime import datetime, timedelta
from mimetypes import guess_type
from pathlib import Path
from threading import Event, Thread
from typing import TYPE_CHECKING, Any, cast
@ -65,7 +64,7 @@ class Application(web.Application):
return cls.DEFAULT
def __init__(self, cfgpath: Path | None, dev: bool = False):
def __init__(self, cfgpath: File | str | None, dev: bool = False):
web.Application.__init__(self,
middlewares = [
handle_response_headers, # type: ignore[list-item]
@ -157,7 +156,7 @@ class Application(web.Application):
else:
static = CachedStaticResource(
"/static", Path(File.from_resource("relay", "frontend/static"))
"/static", File(File.from_resource("relay", "frontend/static"))
)
self.router.register_resource(static)
@ -241,27 +240,30 @@ class Application(web.Application):
class CachedStaticResource(StaticResource):
def __init__(self, prefix: str, path: Path):
def __init__(self, prefix: str, path: File):
StaticResource.__init__(self, prefix, path)
self.cache: dict[str, bytes] = {}
for filename in path.rglob("*"):
if filename.is_dir():
for filename in path.glob(recursive = True):
if filename.isdir:
continue
rel_path = str(filename.relative_to(path))
rel_path = filename.relative_to(path)
with filename.open("rb") as fd:
logging.debug("Loading static resource \"%s\"", rel_path)
self.cache[rel_path] = fd.read()
self.cache[str(rel_path)] = fd.read()
async def _handle(self, request: web.Request) -> web.StreamResponse:
rel_url = request.match_info["filename"]
rel_url = str(Path(request.match_info["filename"], True))
if Path(rel_url).anchor:
raise web.HTTPForbidden()
if rel_url.startswith("/"):
if len(rel_url) < 2:
raise web.HTTPForbidden()
rel_url = rel_url[1:]
try:
return web.Response(

View file

@ -4,9 +4,9 @@ import click
import json
import multiprocessing
from blib import File
from collections.abc import Callable
from functools import update_wrapper
from pathlib import Path
from typing import Concatenate, ParamSpec, TypeVar
from .. import __version__
@ -19,24 +19,24 @@ R = TypeVar("R")
@click.group("cli", context_settings = {"show_default": True})
@click.option("--config", "-c", type = Path, help = "path to the relay config")
@click.option("--config", "-c", type = File, help = "path to the relay config")
@click.version_option(version = __version__, prog_name = "ActivityRelay")
@click.pass_context
def cli(ctx: click.Context, config: Path | None) -> None:
def cli(ctx: click.Context, config: File | None) -> None:
if IS_DOCKER:
config = Path("/data/relay.yaml")
config = File("/data/relay.yaml")
# The database was named "relay.jsonld" even though it"s an sqlite file. Fix it.
db = Path("/data/relay.sqlite3")
wrongdb = Path("/data/relay.jsonld")
db = File("/data/relay.sqlite3")
wrongdb = File("/data/relay.jsonld")
if wrongdb.exists() and not db.exists():
if wrongdb.exists and not db.exists:
try:
with wrongdb.open("rb") as fd:
json.load(fd)
except json.JSONDecodeError:
wrongdb.rename(db)
wrongdb.move(db)
ctx.obj = Application(config)

View file

@ -1,7 +1,7 @@
import aputils
import click
from pathlib import Path
from blib import File
from shutil import copyfile
from . import cli, pass_app
@ -28,10 +28,10 @@ def check_alphanumeric(text: str) -> str:
def cli_convert(app: Application, old_config: str) -> None:
"Convert an old config and jsonld database to the new format."
old_config = str(Path(old_config).expanduser().resolve()) if old_config else str(app.config.path)
backup = app.config.path.parent.joinpath(f"{app.config.path.stem}.backup.yaml")
old_config = File(old_config).resolve() if old_config else app.config.path
backup = app.config.path.parent.join(f"{app.config.path.stem}.backup.yaml")
if str(old_config) == str(app.config.path) and not backup.exists():
if str(old_config) == str(app.config.path) and not backup.exists:
logging.info("Created backup config @ %s", backup)
copyfile(app.config.path, backup)

View file

@ -2,21 +2,20 @@ import json
import os
import yaml
from blib import convert_to_boolean
from blib import File, convert_to_boolean
from functools import cached_property
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
class RelayConfig(dict[str, Any]):
def __init__(self, path: str):
def __init__(self, path: File | str):
dict.__init__(self, {})
if self.is_docker:
path = "/data/config.yaml"
self._path = Path(path).expanduser().resolve()
self._path = File(path).resolve()
self.reset()
@ -36,8 +35,8 @@ class RelayConfig(dict[str, Any]):
@property
def db(self) -> Path:
return Path(self["db"]).expanduser().resolve()
def db(self) -> File:
return File(self["db"]).resolve()
@property
@ -63,7 +62,7 @@ class RelayConfig(dict[str, Any]):
def reset(self) -> None:
self.clear()
self.update({
"db": str(self._path.parent.joinpath(f"{self._path.stem}.jsonld")),
"db": self._path.parent.join(f"{self._path.stem}.jsonld"),
"listen": "0.0.0.0",
"port": 8080,
"note": "Make a note about your instance here.",
@ -91,7 +90,7 @@ class RelayConfig(dict[str, Any]):
pass
try:
with self._path.open("r", encoding = "UTF-8") as fd:
with self._path.open("r") as fd:
config = yaml.load(fd, **options)
except FileNotFoundError:
@ -172,5 +171,5 @@ class RelayDatabase(dict[str, Any]):
pass
except json.decoder.JSONDecodeError as e:
if self.config.db.stat().st_size > 0:
if self.config.db.size > 0:
raise e from None

View file

@ -5,8 +5,8 @@ import os
import platform
import yaml
from blib import File
from dataclasses import asdict, dataclass, fields
from pathlib import Path
from platformdirs import user_config_dir
from typing import TYPE_CHECKING, Any
@ -63,8 +63,8 @@ class Config:
rd_prefix: str = "activityrelay"
def __init__(self, path: Path | None = None, load: bool = False):
self.path: Path = Config.get_config_dir(path)
def __init__(self, path: File | str | None = None, load: bool = False):
self.path: File = Config.get_config_dir(path)
self.reset()
if load:
@ -90,32 +90,31 @@ class Config:
@staticmethod
def get_config_dir(path: Path | str | None = None) -> Path:
if isinstance(path, str):
path = Path(path)
def get_config_dir(path: File | str | None = None) -> File:
if path is not None:
return path.expanduser().resolve()
return File(path).resolve()
paths = (
Path("relay.yaml").resolve(),
Path(user_config_dir("activityrelay"), "relay.yaml"),
Path("/etc/activityrelay/relay.yaml")
File("relay.yaml").resolve(),
File(user_config_dir("activityrelay")).join("relay.yaml"),
File("/etc/activityrelay/relay.yaml")
)
for cfgfile in paths:
if cfgfile.exists():
if cfgfile.exists:
return cfgfile
return paths[0]
@property
def sqlite_path(self) -> Path:
if not os.path.isabs(self.sq_path):
return self.path.parent.joinpath(self.sq_path).resolve()
def sqlite_path(self) -> File:
path = File(self.sq_path)
return Path(self.sq_path).expanduser().resolve()
if not path.isabsolute:
return self.path.parent.join(self.sq_path)
return path.resolve()
@property
@ -143,7 +142,7 @@ class Config:
except AttributeError:
pass
with self.path.open("r", encoding = "UTF-8") as fd:
with self.path.open("r") as fd:
config = yaml.load(fd, **options)
if not config:
@ -185,7 +184,7 @@ class Config:
def save(self) -> None:
self.path.parent.mkdir(exist_ok = True, parents = True)
self.path.parent.mkdir()
data: dict[str, Any] = {}
@ -215,7 +214,7 @@ class Config:
data[key] = value
with self.path.open("w", encoding = "utf-8") as fd:
with self.path.open("w") as fd:
yaml.dump(data, fd, sort_keys = False)

View file

@ -1,2 +0,0 @@
class EmptyBodyError(Exception):
pass

View file

@ -10,7 +10,6 @@ from typing import TYPE_CHECKING, Any, TypeVar, overload
from . import __version__, logger as logging
from .cache import Cache
from .database.schema import Instance
from .errors import EmptyBodyError
from .misc import MIMETYPES, Message, get_app
if TYPE_CHECKING:
@ -40,6 +39,10 @@ SUPPORTS_HS2019 = {
}
class EmptyBodyError(Exception):
pass
class HttpClient:
def __init__(self, limit: int = 100, timeout: int = 10):
self.limit = limit

View file

@ -3,16 +3,9 @@ from __future__ import annotations
import logging
import os
from enum import IntEnum
from pathlib import Path
from typing import TYPE_CHECKING, Any, Protocol
if TYPE_CHECKING:
try:
from typing import Self
except ImportError:
from typing_extensions import Self
from blib import File
from blib import IntEnum
from typing import Any, Protocol
class LoggingMethod(Protocol):
@ -32,35 +25,6 @@ class LogLevel(IntEnum):
return self.name
@classmethod
def parse(cls: type[Self], data: Any) -> Self:
try:
data = int(data)
except ValueError:
pass
if isinstance(data, cls):
return data
if isinstance(data, str):
data = data.upper()
try:
return cls[data]
except KeyError:
pass
try:
return cls(data)
except ValueError:
pass
raise AttributeError(f"Invalid enum property for {cls.__name__}: {data}")
def get_level() -> LogLevel:
return LogLevel.parse(logging.root.level)
@ -84,7 +48,7 @@ critical: LoggingMethod = logging.critical
try:
env_log_file: Path | None = Path(os.environ["LOG_FILE"]).expanduser().resolve()
env_log_file: File | None = File(os.environ["LOG_FILE"]).resolve()
except KeyError:
env_log_file = None

View file

@ -35,7 +35,7 @@ class Template(Environment):
],
loader = FileSystemLoader([
File.from_resource("relay", "frontend"),
app.config.path.parent.joinpath("template")
app.config.path.parent.join("template")
])
)

View file

@ -5,13 +5,12 @@ import traceback
from aiohttp.client_exceptions import ClientConnectionError, ClientSSLError
from asyncio.exceptions import TimeoutError as AsyncTimeoutError
from blib import HttpError
from blib import File, HttpError
from dataclasses import dataclass
from multiprocessing import Event, Process, Queue, Value
from multiprocessing.queues import Queue as QueueType
from multiprocessing.sharedctypes import Synchronized
from multiprocessing.synchronize import Event as EventType
from pathlib import Path
from queue import Empty
from urllib.parse import urlparse
@ -41,7 +40,7 @@ class PushWorker(Process):
self.queue: QueueType[PostItem] = queue
self.shutdown: EventType = Event()
self.path: Path = get_app().config.path
self.path: File = get_app().config.path
self.log_level: Synchronized[int] = log_level
self._log_level_changed: EventType = Event()