Compare commits

...

2 commits

Author SHA1 Message Date
Izalia Mae 06f35541e5 Merge branch 'cache' into 'master'
Draft: caching

See merge request pleroma/relay!54
2024-02-05 18:15:16 +00:00
Izalia Mae bec5d5f207 use gunicorn to start the server 2024-02-05 13:15:08 -05:00
5 changed files with 117 additions and 107 deletions

View file

@ -1,15 +1,17 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import queue import os
import signal import signal
import threading import subprocess
import traceback import sys
import time
import typing import typing
from aiohttp import web from aiohttp import web
from aputils.signer import Signer from aputils.signer import Signer
from datetime import datetime, timedelta from datetime import datetime, timedelta
from gunicorn.app.wsgiapp import WSGIApplication
from . import logger as logging from . import logger as logging
from .cache import get_cache from .cache import get_cache
@ -20,6 +22,7 @@ from .misc import check_open_port
from .views import VIEWS from .views import VIEWS
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from collections.abc import Awaitable
from tinysql import Database, Row from tinysql import Database, Row
from typing import Any from typing import Any
from .cache import Cache from .cache import Cache
@ -31,21 +34,24 @@ if typing.TYPE_CHECKING:
class Application(web.Application): class Application(web.Application):
DEFAULT: Application = None DEFAULT: Application = None
def __init__(self, cfgpath: str): def __init__(self, cfgpath: str, gunicorn: bool = False):
web.Application.__init__(self) web.Application.__init__(self)
Application.DEFAULT = self Application.DEFAULT = self
self['proc'] = None
self['signer'] = None self['signer'] = None
self['start_time'] = None
self['config'] = Config(cfgpath, load = True) self['config'] = Config(cfgpath, load = True)
self['database'] = get_database(self.config) self['database'] = get_database(self.config)
self['client'] = HttpClient() self['client'] = HttpClient()
self['cache'] = get_cache(self) self['cache'] = get_cache(self)
self['workers'] = [] if not gunicorn:
self['last_worker'] = 0 return
self['start_time'] = None
self['running'] = False self.on_response_prepare.append(handle_access_log)
for path, view in VIEWS: for path, view in VIEWS:
self.router.add_view(path, view) self.router.add_view(path, view)
@ -96,17 +102,16 @@ class Application(web.Application):
def push_message(self, inbox: str, message: Message, instance: Row) -> None: def push_message(self, inbox: str, message: Message, instance: Row) -> None:
if self.config.workers <= 0: asyncio.ensure_future(self.client.post(inbox, message, instance))
asyncio.ensure_future(self.client.post(inbox, message, instance))
return
worker = self['workers'][self['last_worker']]
worker.queue.put((inbox, message, instance))
self['last_worker'] += 1 def run(self, dev: bool = False) -> None:
self.start(dev)
if self['last_worker'] >= len(self['workers']): while self['proc'] and self['proc'].poll() is None:
self['last_worker'] = 0 time.sleep(0.1)
self.stop()
def set_signal_handler(self, startup: bool) -> None: def set_signal_handler(self, startup: bool) -> None:
@ -119,91 +124,101 @@ class Application(web.Application):
pass pass
def run(self) -> None:
if not check_open_port(self.config.listen, self.config.port): def start(self, dev: bool = False) -> None:
logging.error('A server is already running on port %i', self.config.port) if self['proc']:
return return
for view in VIEWS: if not check_open_port(self.config.listen, self.config.port):
self.router.add_view(*view) logging.error('Server already running on %s:%s', self.config.listen, self.config.port)
return
logging.info( cmd = [
'Starting webserver at %s (%s:%i)', sys.executable, '-m', 'gunicorn',
self.config.domain, 'relay.application:main_gunicorn',
self.config.listen, '--bind', f'{self.config.listen}:{self.config.port}',
self.config.port '--worker-class', 'aiohttp.GunicornWebWorker',
) '--workers', str(self.config.workers),
'--env', f'CONFIG_FILE={self.config.path}'
]
asyncio.run(self.handle_run()) if dev:
cmd.append('--reload')
def stop(self, *_: Any) -> None:
self['running'] = False
async def handle_run(self) -> None:
self['running'] = True
self.set_signal_handler(True) self.set_signal_handler(True)
self['proc'] = subprocess.Popen(cmd) # pylint: disable=consider-using-with
if self.config.workers > 0:
for _ in range(self.config.workers):
worker = PushWorker(self)
worker.start()
self['workers'].append(worker)
runner = web.AppRunner(self, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{User-Agent}i"')
await runner.setup()
site = web.TCPSite(
runner,
host = self.config.listen,
port = self.config.port,
reuse_address = True
)
await site.start()
self['start_time'] = datetime.now()
while self['running']:
await asyncio.sleep(0.25)
await site.stop()
await self.client.close()
self['start_time'] = None
self['running'] = False
self['workers'].clear()
class PushWorker(threading.Thread): def stop(self, *_) -> None:
if not self['proc']:
return
self['proc'].terminate()
time_wait = 0.0
while self['proc'].poll() is None:
time.sleep(0.1)
time_wait += 0.1
if time_wait >= 5.0:
self['proc'].kill()
break
self.set_signal_handler(False)
self['proc'] = None
# not used, but keeping just in case
class GunicornRunner(WSGIApplication):
def __init__(self, app: Application): def __init__(self, app: Application):
threading.Thread.__init__(self)
self.app = app self.app = app
self.queue = queue.Queue() self.app_uri = 'relay.application:main_gunicorn'
self.client = None self.options = {
'bind': f'{app.config.listen}:{app.config.port}',
'worker_class': 'aiohttp.GunicornWebWorker',
'workers': app.config.workers,
'raw_env': f'CONFIG_FILE={app.config.path}'
}
WSGIApplication.__init__(self)
def run(self) -> None: def load_config(self):
asyncio.run(self.handle_queue()) for key, value in self.options.items():
self.cfg.set(key, value)
async def handle_queue(self) -> None: def run(self):
self.client = HttpClient() logging.info('Starting webserver for %s', self.app.config.domain)
WSGIApplication.run(self)
while self.app['running']:
try:
inbox, message, instance = self.queue.get(block=True, timeout=0.25)
self.queue.task_done()
logging.verbose('New push from Thread-%i', threading.get_ident())
await self.client.post(inbox, message, instance)
except queue.Empty: async def handle_access_log(request: web.Request, response: web.Response) -> None:
pass address = request.headers.get(
'X-Forwarded-For',
request.headers.get(
'X-Real-Ip',
request.remote
)
)
## make sure an exception doesn't bring down the worker logging.info(
except Exception: '%s "%s %s" %i %i "%s"',
traceback.print_exc() address,
request.method,
request.path,
response.status,
len(response.body),
request.headers.get('User-Agent', 'n/a')
)
await self.client.close()
async def main_gunicorn():
try:
app = Application(os.environ['CONFIG_FILE'], gunicorn = True)
except KeyError:
logging.error('Failed to set "CONFIG_FILE" environment. Trying to run without gunicorn?')
raise
return app

View file

@ -70,7 +70,6 @@ error: Callable = logging.error
critical: Callable = logging.critical critical: Callable = logging.critical
logging.addLevelName(LogLevel['VERBOSE'], 'VERBOSE')
env_log_level = os.environ.get('LOG_LEVEL', 'INFO').upper() env_log_level = os.environ.get('LOG_LEVEL', 'INFO').upper()
try: try:
@ -79,22 +78,15 @@ try:
except KeyError: except KeyError:
env_log_file = None env_log_file = None
try:
log_level = LogLevel[env_log_level]
except KeyError:
print('Invalid log level:', env_log_level)
log_level = LogLevel['INFO']
handlers = [logging.StreamHandler()] handlers = [logging.StreamHandler()]
if env_log_file: if env_log_file:
handlers.append(logging.FileHandler(env_log_file)) handlers.append(logging.FileHandler(env_log_file))
logging.addLevelName(LogLevel['VERBOSE'], 'VERBOSE')
logging.basicConfig( logging.basicConfig(
level = log_level, level = LogLevel.INFO,
format = '[%(asctime)s] %(levelname)s: %(message)s', format = '[%(asctime)s] %(levelname)s: %(message)s',
datefmt = '%Y-%m-%d %H:%M:%S',
handlers = handlers handlers = handlers
) )

View file

@ -18,7 +18,7 @@ from .application import Application
from .compat import RelayConfig, RelayDatabase from .compat import RelayConfig, RelayDatabase
from .database import get_database from .database import get_database
from .database.connection import RELAY_SOFTWARE from .database.connection import RELAY_SOFTWARE
from .misc import IS_DOCKER, Message, check_open_port from .misc import IS_DOCKER, Message
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from tinysql import Row from tinysql import Row
@ -70,6 +70,11 @@ def cli(ctx: click.Context, config: str) -> None:
cli_setup.callback() cli_setup.callback()
else: else:
click.echo(
'[DEPRECATED] Running the relay without the "run" command will be removed in the ' +
'future.'
)
cli_run.callback() cli_run.callback()
@ -200,8 +205,9 @@ def cli_setup(ctx: click.Context) -> None:
@cli.command('run') @cli.command('run')
@click.option('--dev', '-d', is_flag = True, help = 'Enable worker reloading on code change')
@click.pass_context @click.pass_context
def cli_run(ctx: click.Context) -> None: def cli_run(ctx: click.Context, dev: bool = False) -> None:
'Run the relay' 'Run the relay'
if ctx.obj.config.domain.endswith('example.com') or not ctx.obj.signer: if ctx.obj.config.domain.endswith('example.com') or not ctx.obj.signer:
@ -228,11 +234,7 @@ def cli_run(ctx: click.Context) -> None:
click.echo(pip_command) click.echo(pip_command)
return return
if not check_open_port(ctx.obj.config.listen, ctx.obj.config.port): ctx.obj.run(dev)
click.echo(f'Error: A server is already running on port {ctx.obj.config.port}')
return
ctx.obj.run()
@cli.command('convert') @cli.command('convert')

View file

@ -14,9 +14,9 @@ from functools import cached_property
from uuid import uuid4 from uuid import uuid4
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from collections.abc import Coroutine, Generator from collections.abc import Awaitable, Coroutine, Generator
from tinysql import Connection from tinysql import Connection
from typing import Any, Awaitable from typing import Any
from .application import Application from .application import Application
from .cache import Cache from .cache import Cache
from .config import Config from .config import Config
@ -236,7 +236,7 @@ class Response(AiohttpResponse):
class View(AbstractView): class View(AbstractView):
def __await__(self) -> Generator[Response]: def __await__(self) -> Generator[Response]:
if (self.request.method) not in METHODS: if self.request.method not in METHODS:
raise HTTPMethodNotAllowed(self.request.method, self.allowed_methods) raise HTTPMethodNotAllowed(self.request.method, self.allowed_methods)
if not (handler := self.handlers.get(self.request.method)): if not (handler := self.handlers.get(self.request.method)):

View file

@ -1,6 +1,7 @@
aiohttp>=3.9.1 aiohttp>=3.9.1
aputils@https://git.barkshark.xyz/barkshark/aputils/archive/0.1.6a.tar.gz aputils@https://git.barkshark.xyz/barkshark/aputils/archive/0.1.6a.tar.gz
click>=8.1.2 click>=8.1.2
gunicorn==21.1.0
hiredis==2.3.2 hiredis==2.3.2
pyyaml>=6.0 pyyaml>=6.0
redis==5.0.1 redis==5.0.1