Compare commits

...

7 commits

Author SHA1 Message Date
Izalia Mae 1ced9c2c4e Merge branch 'dev' into 'main'
Draft: version 0.3.3

See merge request pleroma/relay!59
2024-08-27 02:27:26 +00:00
Izalia Mae ebe3337823 remove QueueItem class 2024-08-26 22:27:20 -04:00
Izalia Mae cff3bc30b8 ensure log level for workers gets updated 2024-08-26 22:23:04 -04:00
Izalia Mae 0576933b16 fix linter issue 2024-08-26 22:22:29 -04:00
Izalia Mae 065038cc2a don't use bare except 2024-08-26 22:21:59 -04:00
Izalia Mae d44cee6d01 replace "type: ignore" with typing.cast and add Application.workers property 2024-08-26 22:20:48 -04:00
Izalia Mae 501ac2cabb fix template headers 2024-08-26 21:54:43 -04:00
7 changed files with 33 additions and 31 deletions

View file

@ -18,9 +18,9 @@ from datetime import datetime, timedelta
from mimetypes import guess_type from mimetypes import guess_type
from pathlib import Path from pathlib import Path
from threading import Event, Thread from threading import Event, Thread
from typing import Any from typing import Any, cast
from . import logger as logging, workers from . import logger as logging
from .cache import Cache, get_cache from .cache import Cache, get_cache
from .config import Config from .config import Config
from .database import Connection, get_database from .database import Connection, get_database
@ -32,6 +32,7 @@ from .template import Template
from .views import VIEWS from .views import VIEWS
from .views.api import handle_api_path from .views.api import handle_api_path
from .views.frontend import handle_frontend_path from .views.frontend import handle_frontend_path
from .workers import PushWorkers
def get_csp(request: web.Request) -> str: def get_csp(request: web.Request) -> str:
@ -78,7 +79,7 @@ class Application(web.Application):
self['cache'].setup() self['cache'].setup()
self['template'] = Template(self) self['template'] = Template(self)
self['push_queue'] = multiprocessing.Queue() self['push_queue'] = multiprocessing.Queue()
self['workers'] = workers.PushWorkers(self.config.workers) self['workers'] = PushWorkers(self.config.workers)
self.cache.setup() self.cache.setup()
self.on_cleanup.append(handle_cleanup) # type: ignore self.on_cleanup.append(handle_cleanup) # type: ignore
@ -95,27 +96,27 @@ class Application(web.Application):
@property @property
def cache(self) -> Cache: def cache(self) -> Cache:
return self['cache'] # type: ignore[no-any-return] return cast(Cache, self['cache'])
@property @property
def client(self) -> HttpClient: def client(self) -> HttpClient:
return self['client'] # type: ignore[no-any-return] return cast(HttpClient, self['client'])
@property @property
def config(self) -> Config: def config(self) -> Config:
return self['config'] # type: ignore[no-any-return] return cast(Config, self['config'])
@property @property
def database(self) -> Database[Connection]: def database(self) -> Database[Connection]:
return self['database'] # type: ignore[no-any-return] return cast(Database[Connection], self['database'])
@property @property
def signer(self) -> Signer: def signer(self) -> Signer:
return self['signer'] # type: ignore[no-any-return] return cast(Signer, self['signer'])
@signer.setter @signer.setter
@ -129,7 +130,7 @@ class Application(web.Application):
@property @property
def template(self) -> Template: def template(self) -> Template:
return self['template'] # type: ignore[no-any-return] return cast(Template, self['template'])
@property @property
@ -142,6 +143,11 @@ class Application(web.Application):
return timedelta(seconds=uptime.seconds) return timedelta(seconds=uptime.seconds)
@property
def workers(self) -> PushWorkers:
return cast(PushWorkers, self['workers'])
def push_message(self, inbox: str, message: Message, instance: Instance) -> None: def push_message(self, inbox: str, message: Message, instance: Instance) -> None:
self['workers'].push_message(inbox, message, instance) self['workers'].push_message(inbox, message, instance)

View file

@ -141,6 +141,7 @@ class Connection(SqlConnection):
return self.execute("SELECT * FROM inboxes WHERE accepted = 1").all(schema.Instance) return self.execute("SELECT * FROM inboxes WHERE accepted = 1").all(schema.Instance)
# todo: check if software is different than stored row
def put_inbox(self, def put_inbox(self,
domain: str, domain: str,
inbox: str | None = None, inbox: str | None = None,

View file

@ -10,7 +10,7 @@
%head %head
%title << {{config.name}}: {{page}} %title << {{config.name}}: {{page}}
%meta(charset="UTF-8") %meta(charset="UTF-8")
%meta(name="ort" content="width=device-width, initial-scale=1") %meta(name="viewport" content="width=device-width, initial-scale=1")
%link(rel="stylesheet" type="text/css" href="/theme/{{config.theme}}.css?{{version}}" nonce="{{request['hash']}}" class="theme") %link(rel="stylesheet" type="text/css" href="/theme/{{config.theme}}.css?{{version}}" nonce="{{request['hash']}}" class="theme")
%link(rel="stylesheet" type="text/css" href="/static/style.css?{{version}}" nonce="{{request['hash']}}") %link(rel="stylesheet" type="text/css" href="/static/style.css?{{version}}" nonce="{{request['hash']}}")
%link(rel="stylesheet" type="text/css" href="/static/bootstrap-icons.css?{{version}}" nonce="{{request['hash']}}") %link(rel="stylesheet" type="text/css" href="/static/bootstrap-icons.css?{{version}}" nonce="{{request['hash']}}")

View file

@ -31,7 +31,7 @@ SUPPORTS_HS2019 = {
'sharkey' 'sharkey'
} }
T = TypeVar('T', bound = JsonBase) T = TypeVar('T', bound = JsonBase[Any])
HEADERS = { HEADERS = {
'Accept': f'{MIMETYPES["activity"]}, {MIMETYPES["json"]};q=0.9', 'Accept': f'{MIMETYPES["activity"]}, {MIMETYPES["json"]};q=0.9',
'User-Agent': f'ActivityRelay/{__version__}' 'User-Agent': f'ActivityRelay/{__version__}'

View file

@ -366,8 +366,8 @@ def cli_config_set(ctx: click.Context, key: str, value: Any) -> None:
with ctx.obj.database.session() as conn: with ctx.obj.database.session() as conn:
new_value = conn.put_config(key, value) new_value = conn.put_config(key, value)
except: except Exception:
click.echo('Invalid config name:', key) click.echo(f'Invalid config name: {key}')
return return
click.echo(f'{key}: {repr(new_value)}') click.echo(f'{key}: {repr(new_value)}')

View file

@ -276,7 +276,10 @@ class Config(View):
raise HttpError(400, 'Invalid key') raise HttpError(400, 'Invalid key')
with self.database.session() as conn: with self.database.session() as conn:
conn.put_config(data['key'], data['value']) value = conn.put_config(data['key'], data['value'])
if data['key'] == 'log-level':
self.app.workers.set_log_level(value)
return Response.new({'message': 'Updated config'}, ctype = 'json') return Response.new({'message': 'Updated config'}, ctype = 'json')
@ -288,7 +291,10 @@ class Config(View):
raise HttpError(400, 'Invalid key') raise HttpError(400, 'Invalid key')
with self.database.session() as conn: with self.database.session() as conn:
conn.put_config(data['key'], ConfigData.DEFAULT(data['key'])) value = conn.put_config(data['key'], ConfigData.DEFAULT(data['key']))
if data['key'] == 'log-level':
self.app.workers.set_log_level(value)
return Response.new({'message': 'Updated config'}, ctype = 'json') return Response.new({'message': 'Updated config'}, ctype = 'json')

View file

@ -21,12 +21,7 @@ from .misc import IS_WINDOWS, Message, get_app
@dataclass @dataclass
class QueueItem: class PostItem:
pass
@dataclass
class PostItem(QueueItem):
inbox: str inbox: str
message: Message message: Message
instance: Instance | None instance: Instance | None
@ -40,10 +35,10 @@ class PushWorker(Process):
client: HttpClient client: HttpClient
def __init__(self, queue: QueueType[QueueItem], log_level: Synchronized[int]) -> None: def __init__(self, queue: QueueType[PostItem], log_level: Synchronized[int]) -> None:
Process.__init__(self) Process.__init__(self)
self.queue: QueueType[QueueItem] = queue self.queue: QueueType[PostItem] = queue
self.shutdown: EventType = Event() self.shutdown: EventType = Event()
self.path: Path = get_app().config.path self.path: Path = get_app().config.path
self.log_level: Synchronized[int] = log_level self.log_level: Synchronized[int] = log_level
@ -80,9 +75,7 @@ class PushWorker(Process):
self._log_level_changed.clear() self._log_level_changed.clear()
item = self.queue.get(block=True, timeout=0.1) item = self.queue.get(block=True, timeout=0.1)
asyncio.create_task(self.handle_post(item))
if isinstance(item, PostItem):
asyncio.create_task(self.handle_post(item))
except Empty: except Empty:
await asyncio.sleep(0) await asyncio.sleep(0)
@ -113,15 +106,11 @@ class PushWorker(Process):
class PushWorkers(list[PushWorker]): class PushWorkers(list[PushWorker]):
def __init__(self, count: int) -> None: def __init__(self, count: int) -> None:
self.queue: QueueType[QueueItem] = Queue() self.queue: QueueType[PostItem] = Queue()
self._log_level: Synchronized[int] = Value("i", logging.get_level()) self._log_level: Synchronized[int] = Value("i", logging.get_level())
self._count: int = count self._count: int = count
def push_item(self, item: QueueItem) -> None:
self.queue.put(item)
def push_message(self, inbox: str, message: Message, instance: Instance) -> None: def push_message(self, inbox: str, message: Message, instance: Instance) -> None:
self.queue.put(PostItem(inbox, message, instance)) self.queue.put(PostItem(inbox, message, instance))