From bec5d5f2079f794789de439ecebee1713d9a96fc Mon Sep 17 00:00:00 2001 From: Izalia Mae Date: Mon, 5 Feb 2024 13:15:08 -0500 Subject: [PATCH] use gunicorn to start the server --- relay/application.py | 187 +++++++++++++++++++++++-------------------- relay/logger.py | 14 +--- relay/manage.py | 16 ++-- relay/misc.py | 6 +- requirements.txt | 1 + 5 files changed, 117 insertions(+), 107 deletions(-) diff --git a/relay/application.py b/relay/application.py index 92bccb5..80efed9 100644 --- a/relay/application.py +++ b/relay/application.py @@ -1,15 +1,17 @@ from __future__ import annotations import asyncio -import queue +import os import signal -import threading -import traceback +import subprocess +import sys +import time import typing from aiohttp import web from aputils.signer import Signer from datetime import datetime, timedelta +from gunicorn.app.wsgiapp import WSGIApplication from . import logger as logging from .cache import get_cache @@ -20,6 +22,7 @@ from .misc import check_open_port from .views import VIEWS if typing.TYPE_CHECKING: + from collections.abc import Awaitable from tinysql import Database, Row from typing import Any from .cache import Cache @@ -31,21 +34,24 @@ if typing.TYPE_CHECKING: class Application(web.Application): DEFAULT: Application = None - def __init__(self, cfgpath: str): + def __init__(self, cfgpath: str, gunicorn: bool = False): web.Application.__init__(self) Application.DEFAULT = self + self['proc'] = None self['signer'] = None + self['start_time'] = None + self['config'] = Config(cfgpath, load = True) self['database'] = get_database(self.config) self['client'] = HttpClient() self['cache'] = get_cache(self) - self['workers'] = [] - self['last_worker'] = 0 - self['start_time'] = None - self['running'] = False + if not gunicorn: + return + + self.on_response_prepare.append(handle_access_log) for path, view in VIEWS: self.router.add_view(path, view) @@ -96,17 +102,16 @@ class Application(web.Application): def push_message(self, inbox: str, message: Message, instance: Row) -> None: - if self.config.workers <= 0: - asyncio.ensure_future(self.client.post(inbox, message, instance)) - return + asyncio.ensure_future(self.client.post(inbox, message, instance)) - worker = self['workers'][self['last_worker']] - worker.queue.put((inbox, message, instance)) - self['last_worker'] += 1 + def run(self, dev: bool = False) -> None: + self.start(dev) - if self['last_worker'] >= len(self['workers']): - self['last_worker'] = 0 + while self['proc'] and self['proc'].poll() is None: + time.sleep(0.1) + + self.stop() def set_signal_handler(self, startup: bool) -> None: @@ -119,91 +124,101 @@ class Application(web.Application): pass - def run(self) -> None: - if not check_open_port(self.config.listen, self.config.port): - logging.error('A server is already running on port %i', self.config.port) + + def start(self, dev: bool = False) -> None: + if self['proc']: return - for view in VIEWS: - self.router.add_view(*view) + if not check_open_port(self.config.listen, self.config.port): + logging.error('Server already running on %s:%s', self.config.listen, self.config.port) + return - logging.info( - 'Starting webserver at %s (%s:%i)', - self.config.domain, - self.config.listen, - self.config.port - ) + cmd = [ + sys.executable, '-m', 'gunicorn', + 'relay.application:main_gunicorn', + '--bind', f'{self.config.listen}:{self.config.port}', + '--worker-class', 'aiohttp.GunicornWebWorker', + '--workers', str(self.config.workers), + '--env', f'CONFIG_FILE={self.config.path}' + ] - asyncio.run(self.handle_run()) - - - def stop(self, *_: Any) -> None: - self['running'] = False - - - async def handle_run(self) -> None: - self['running'] = True + if dev: + cmd.append('--reload') self.set_signal_handler(True) - - if self.config.workers > 0: - for _ in range(self.config.workers): - worker = PushWorker(self) - worker.start() - - self['workers'].append(worker) - - runner = web.AppRunner(self, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{User-Agent}i"') - await runner.setup() - - site = web.TCPSite( - runner, - host = self.config.listen, - port = self.config.port, - reuse_address = True - ) - - await site.start() - self['start_time'] = datetime.now() - - while self['running']: - await asyncio.sleep(0.25) - - await site.stop() - await self.client.close() - - self['start_time'] = None - self['running'] = False - self['workers'].clear() + self['proc'] = subprocess.Popen(cmd) # pylint: disable=consider-using-with -class PushWorker(threading.Thread): + def stop(self, *_) -> None: + if not self['proc']: + return + + self['proc'].terminate() + time_wait = 0.0 + + while self['proc'].poll() is None: + time.sleep(0.1) + time_wait += 0.1 + + if time_wait >= 5.0: + self['proc'].kill() + break + + self.set_signal_handler(False) + self['proc'] = None + + +# not used, but keeping just in case +class GunicornRunner(WSGIApplication): def __init__(self, app: Application): - threading.Thread.__init__(self) self.app = app - self.queue = queue.Queue() - self.client = None + self.app_uri = 'relay.application:main_gunicorn' + self.options = { + 'bind': f'{app.config.listen}:{app.config.port}', + 'worker_class': 'aiohttp.GunicornWebWorker', + 'workers': app.config.workers, + 'raw_env': f'CONFIG_FILE={app.config.path}' + } + + WSGIApplication.__init__(self) - def run(self) -> None: - asyncio.run(self.handle_queue()) + def load_config(self): + for key, value in self.options.items(): + self.cfg.set(key, value) - async def handle_queue(self) -> None: - self.client = HttpClient() + def run(self): + logging.info('Starting webserver for %s', self.app.config.domain) + WSGIApplication.run(self) - while self.app['running']: - try: - inbox, message, instance = self.queue.get(block=True, timeout=0.25) - self.queue.task_done() - logging.verbose('New push from Thread-%i', threading.get_ident()) - await self.client.post(inbox, message, instance) - except queue.Empty: - pass +async def handle_access_log(request: web.Request, response: web.Response) -> None: + address = request.headers.get( + 'X-Forwarded-For', + request.headers.get( + 'X-Real-Ip', + request.remote + ) + ) - ## make sure an exception doesn't bring down the worker - except Exception: - traceback.print_exc() + logging.info( + '%s "%s %s" %i %i "%s"', + address, + request.method, + request.path, + response.status, + len(response.body), + request.headers.get('User-Agent', 'n/a') + ) - await self.client.close() + +async def main_gunicorn(): + try: + app = Application(os.environ['CONFIG_FILE'], gunicorn = True) + + except KeyError: + logging.error('Failed to set "CONFIG_FILE" environment. Trying to run without gunicorn?') + raise + + return app diff --git a/relay/logger.py b/relay/logger.py index 1970cab..8aff62d 100644 --- a/relay/logger.py +++ b/relay/logger.py @@ -70,7 +70,6 @@ error: Callable = logging.error critical: Callable = logging.critical -logging.addLevelName(LogLevel['VERBOSE'], 'VERBOSE') env_log_level = os.environ.get('LOG_LEVEL', 'INFO').upper() try: @@ -79,22 +78,15 @@ try: except KeyError: env_log_file = None - -try: - log_level = LogLevel[env_log_level] - -except KeyError: - print('Invalid log level:', env_log_level) - log_level = LogLevel['INFO'] - - handlers = [logging.StreamHandler()] if env_log_file: handlers.append(logging.FileHandler(env_log_file)) +logging.addLevelName(LogLevel['VERBOSE'], 'VERBOSE') logging.basicConfig( - level = log_level, + level = LogLevel.INFO, format = '[%(asctime)s] %(levelname)s: %(message)s', + datefmt = '%Y-%m-%d %H:%M:%S', handlers = handlers ) diff --git a/relay/manage.py b/relay/manage.py index df5b4cb..2a78b6f 100644 --- a/relay/manage.py +++ b/relay/manage.py @@ -18,7 +18,7 @@ from .application import Application from .compat import RelayConfig, RelayDatabase from .database import get_database from .database.connection import RELAY_SOFTWARE -from .misc import IS_DOCKER, Message, check_open_port +from .misc import IS_DOCKER, Message if typing.TYPE_CHECKING: from tinysql import Row @@ -70,6 +70,11 @@ def cli(ctx: click.Context, config: str) -> None: cli_setup.callback() else: + click.echo( + '[DEPRECATED] Running the relay without the "run" command will be removed in the ' + + 'future.' + ) + cli_run.callback() @@ -200,8 +205,9 @@ def cli_setup(ctx: click.Context) -> None: @cli.command('run') +@click.option('--dev', '-d', is_flag = True, help = 'Enable worker reloading on code change') @click.pass_context -def cli_run(ctx: click.Context) -> None: +def cli_run(ctx: click.Context, dev: bool = False) -> None: 'Run the relay' if ctx.obj.config.domain.endswith('example.com') or not ctx.obj.signer: @@ -228,11 +234,7 @@ def cli_run(ctx: click.Context) -> None: click.echo(pip_command) return - if not check_open_port(ctx.obj.config.listen, ctx.obj.config.port): - click.echo(f'Error: A server is already running on port {ctx.obj.config.port}') - return - - ctx.obj.run() + ctx.obj.run(dev) @cli.command('convert') diff --git a/relay/misc.py b/relay/misc.py index 2a72f22..e71845d 100644 --- a/relay/misc.py +++ b/relay/misc.py @@ -14,9 +14,9 @@ from functools import cached_property from uuid import uuid4 if typing.TYPE_CHECKING: - from collections.abc import Coroutine, Generator + from collections.abc import Awaitable, Coroutine, Generator from tinysql import Connection - from typing import Any, Awaitable + from typing import Any from .application import Application from .cache import Cache from .config import Config @@ -236,7 +236,7 @@ class Response(AiohttpResponse): class View(AbstractView): def __await__(self) -> Generator[Response]: - if (self.request.method) not in METHODS: + if self.request.method not in METHODS: raise HTTPMethodNotAllowed(self.request.method, self.allowed_methods) if not (handler := self.handlers.get(self.request.method)): diff --git a/requirements.txt b/requirements.txt index 979a894..5239cb8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ aiohttp>=3.9.1 aputils@https://git.barkshark.xyz/barkshark/aputils/archive/0.1.6a.tar.gz click>=8.1.2 +gunicorn==21.1.0 hiredis==2.3.2 pyyaml>=6.0 redis==5.0.1