mirror of
https://git.pleroma.social/pleroma/relay.git
synced 2024-11-14 03:27:59 +00:00
Compare commits
1 commit
1ced9c2c4e
...
0f5b0d37e6
Author | SHA1 | Date | |
---|---|---|---|
0f5b0d37e6 |
|
@ -18,9 +18,9 @@ from datetime import datetime, timedelta
|
|||
from mimetypes import guess_type
|
||||
from pathlib import Path
|
||||
from threading import Event, Thread
|
||||
from typing import Any, cast
|
||||
from typing import Any
|
||||
|
||||
from . import logger as logging
|
||||
from . import logger as logging, workers
|
||||
from .cache import Cache, get_cache
|
||||
from .config import Config
|
||||
from .database import Connection, get_database
|
||||
|
@ -32,7 +32,6 @@ from .template import Template
|
|||
from .views import VIEWS
|
||||
from .views.api import handle_api_path
|
||||
from .views.frontend import handle_frontend_path
|
||||
from .workers import PushWorkers
|
||||
|
||||
|
||||
def get_csp(request: web.Request) -> str:
|
||||
|
@ -79,7 +78,7 @@ class Application(web.Application):
|
|||
self['cache'].setup()
|
||||
self['template'] = Template(self)
|
||||
self['push_queue'] = multiprocessing.Queue()
|
||||
self['workers'] = PushWorkers(self.config.workers)
|
||||
self['workers'] = workers.PushWorkers(self.config.workers)
|
||||
|
||||
self.cache.setup()
|
||||
self.on_cleanup.append(handle_cleanup) # type: ignore
|
||||
|
@ -96,27 +95,27 @@ class Application(web.Application):
|
|||
|
||||
@property
|
||||
def cache(self) -> Cache:
|
||||
return cast(Cache, self['cache'])
|
||||
return self['cache'] # type: ignore[no-any-return]
|
||||
|
||||
|
||||
@property
|
||||
def client(self) -> HttpClient:
|
||||
return cast(HttpClient, self['client'])
|
||||
return self['client'] # type: ignore[no-any-return]
|
||||
|
||||
|
||||
@property
|
||||
def config(self) -> Config:
|
||||
return cast(Config, self['config'])
|
||||
return self['config'] # type: ignore[no-any-return]
|
||||
|
||||
|
||||
@property
|
||||
def database(self) -> Database[Connection]:
|
||||
return cast(Database[Connection], self['database'])
|
||||
return self['database'] # type: ignore[no-any-return]
|
||||
|
||||
|
||||
@property
|
||||
def signer(self) -> Signer:
|
||||
return cast(Signer, self['signer'])
|
||||
return self['signer'] # type: ignore[no-any-return]
|
||||
|
||||
|
||||
@signer.setter
|
||||
|
@ -130,7 +129,7 @@ class Application(web.Application):
|
|||
|
||||
@property
|
||||
def template(self) -> Template:
|
||||
return cast(Template, self['template'])
|
||||
return self['template'] # type: ignore[no-any-return]
|
||||
|
||||
|
||||
@property
|
||||
|
@ -143,11 +142,6 @@ class Application(web.Application):
|
|||
return timedelta(seconds=uptime.seconds)
|
||||
|
||||
|
||||
@property
|
||||
def workers(self) -> PushWorkers:
|
||||
return cast(PushWorkers, self['workers'])
|
||||
|
||||
|
||||
def push_message(self, inbox: str, message: Message, instance: Instance) -> None:
|
||||
self['workers'].push_message(inbox, message, instance)
|
||||
|
||||
|
|
|
@ -141,7 +141,6 @@ class Connection(SqlConnection):
|
|||
return self.execute("SELECT * FROM inboxes WHERE accepted = 1").all(schema.Instance)
|
||||
|
||||
|
||||
# todo: check if software is different than stored row
|
||||
def put_inbox(self,
|
||||
domain: str,
|
||||
inbox: str | None = None,
|
||||
|
|
|
@ -10,7 +10,7 @@
|
|||
%head
|
||||
%title << {{config.name}}: {{page}}
|
||||
%meta(charset="UTF-8")
|
||||
%meta(name="viewport" content="width=device-width, initial-scale=1")
|
||||
%meta(name="ort" content="width=device-width, initial-scale=1")
|
||||
%link(rel="stylesheet" type="text/css" href="/theme/{{config.theme}}.css?{{version}}" nonce="{{request['hash']}}" class="theme")
|
||||
%link(rel="stylesheet" type="text/css" href="/static/style.css?{{version}}" nonce="{{request['hash']}}")
|
||||
%link(rel="stylesheet" type="text/css" href="/static/bootstrap-icons.css?{{version}}" nonce="{{request['hash']}}")
|
||||
|
|
|
@ -31,7 +31,7 @@ SUPPORTS_HS2019 = {
|
|||
'sharkey'
|
||||
}
|
||||
|
||||
T = TypeVar('T', bound = JsonBase[Any])
|
||||
T = TypeVar('T', bound = JsonBase)
|
||||
HEADERS = {
|
||||
'Accept': f'{MIMETYPES["activity"]}, {MIMETYPES["json"]};q=0.9',
|
||||
'User-Agent': f'ActivityRelay/{__version__}'
|
||||
|
|
|
@ -366,8 +366,8 @@ def cli_config_set(ctx: click.Context, key: str, value: Any) -> None:
|
|||
with ctx.obj.database.session() as conn:
|
||||
new_value = conn.put_config(key, value)
|
||||
|
||||
except Exception:
|
||||
click.echo(f'Invalid config name: {key}')
|
||||
except:
|
||||
click.echo('Invalid config name:', key)
|
||||
return
|
||||
|
||||
click.echo(f'{key}: {repr(new_value)}')
|
||||
|
|
|
@ -276,10 +276,7 @@ class Config(View):
|
|||
raise HttpError(400, 'Invalid key')
|
||||
|
||||
with self.database.session() as conn:
|
||||
value = conn.put_config(data['key'], data['value'])
|
||||
|
||||
if data['key'] == 'log-level':
|
||||
self.app.workers.set_log_level(value)
|
||||
conn.put_config(data['key'], data['value'])
|
||||
|
||||
return Response.new({'message': 'Updated config'}, ctype = 'json')
|
||||
|
||||
|
@ -291,10 +288,7 @@ class Config(View):
|
|||
raise HttpError(400, 'Invalid key')
|
||||
|
||||
with self.database.session() as conn:
|
||||
value = conn.put_config(data['key'], ConfigData.DEFAULT(data['key']))
|
||||
|
||||
if data['key'] == 'log-level':
|
||||
self.app.workers.set_log_level(value)
|
||||
conn.put_config(data['key'], ConfigData.DEFAULT(data['key']))
|
||||
|
||||
return Response.new({'message': 'Updated config'}, ctype = 'json')
|
||||
|
||||
|
|
|
@ -21,7 +21,12 @@ from .misc import IS_WINDOWS, Message, get_app
|
|||
|
||||
|
||||
@dataclass
|
||||
class PostItem:
|
||||
class QueueItem:
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class PostItem(QueueItem):
|
||||
inbox: str
|
||||
message: Message
|
||||
instance: Instance | None
|
||||
|
@ -35,10 +40,10 @@ class PushWorker(Process):
|
|||
client: HttpClient
|
||||
|
||||
|
||||
def __init__(self, queue: QueueType[PostItem], log_level: Synchronized[int]) -> None:
|
||||
def __init__(self, queue: QueueType[QueueItem], log_level: Synchronized[int]) -> None:
|
||||
Process.__init__(self)
|
||||
|
||||
self.queue: QueueType[PostItem] = queue
|
||||
self.queue: QueueType[QueueItem] = queue
|
||||
self.shutdown: EventType = Event()
|
||||
self.path: Path = get_app().config.path
|
||||
self.log_level: Synchronized[int] = log_level
|
||||
|
@ -75,7 +80,9 @@ class PushWorker(Process):
|
|||
self._log_level_changed.clear()
|
||||
|
||||
item = self.queue.get(block=True, timeout=0.1)
|
||||
asyncio.create_task(self.handle_post(item))
|
||||
|
||||
if isinstance(item, PostItem):
|
||||
asyncio.create_task(self.handle_post(item))
|
||||
|
||||
except Empty:
|
||||
await asyncio.sleep(0)
|
||||
|
@ -106,11 +113,15 @@ class PushWorker(Process):
|
|||
|
||||
class PushWorkers(list[PushWorker]):
|
||||
def __init__(self, count: int) -> None:
|
||||
self.queue: QueueType[PostItem] = Queue()
|
||||
self.queue: QueueType[QueueItem] = Queue()
|
||||
self._log_level: Synchronized[int] = Value("i", logging.get_level())
|
||||
self._count: int = count
|
||||
|
||||
|
||||
def push_item(self, item: QueueItem) -> None:
|
||||
self.queue.put(item)
|
||||
|
||||
|
||||
def push_message(self, inbox: str, message: Message, instance: Instance) -> None:
|
||||
self.queue.put(PostItem(inbox, message, instance))
|
||||
|
||||
|
|
Loading…
Reference in a new issue