diff --git a/docs/commands.md b/docs/commands.md index f437c8f..e28acbe 100644 --- a/docs/commands.md +++ b/docs/commands.md @@ -9,7 +9,7 @@ use `python3 -m relay` if installed via pip or `~/.local/bin/activityrelay` if i ## Run -Run the relay. Optionally add `-d` or `--dev` to enable auto-reloading on code changes. +Run the relay. activityrelay run diff --git a/docs/configuration.md b/docs/configuration.md index ec8212f..2fad0af 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -19,10 +19,10 @@ proxy is on the same host. port: 8080 -### Web Workers +### Push Workers -The number of processes to spawn for handling web requests. Leave it at 0 to automatically detect -how many processes should be spawned. +The number of processes to spawn for pushing messages to subscribed instances. Leave it at 0 to +automatically detect how many processes should be spawned. workers: 0 diff --git a/relay/application.py b/relay/application.py index cafef59..02fe73c 100644 --- a/relay/application.py +++ b/relay/application.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import multiprocessing import os import signal import subprocess @@ -12,6 +13,7 @@ from aiohttp import web from aiohttp_swagger import setup_swagger from aputils.signer import Signer from datetime import datetime, timedelta +from queue import Empty from threading import Event, Thread from . import logger as logging @@ -40,7 +42,7 @@ if typing.TYPE_CHECKING: class Application(web.Application): DEFAULT: Application = None - def __init__(self, cfgpath: str, gunicorn: bool = False): + def __init__(self, cfgpath: str): web.Application.__init__(self, middlewares = [ handle_api_path @@ -49,7 +51,7 @@ class Application(web.Application): Application.DEFAULT = self - self['proc'] = None + self['running'] = None self['signer'] = None self['start_time'] = None self['cleanup_thread'] = None @@ -58,9 +60,8 @@ class Application(web.Application): self['database'] = get_database(self.config) self['client'] = HttpClient() self['cache'] = get_cache(self) - - if not gunicorn: - return + self['push_queue'] = multiprocessing.Queue() + self['workers'] = [] self.on_response_prepare.append(handle_access_log) self.on_cleanup.append(handle_cleanup) @@ -119,16 +120,18 @@ class Application(web.Application): def push_message(self, inbox: str, message: Message, instance: Row) -> None: - asyncio.ensure_future(self.client.post(inbox, message, instance)) + self['push_queue'].put((inbox, message, instance)) - def run(self, dev: bool = False) -> None: - self.start(dev) + def run(self) -> None: + if self["running"]: + return - while self['proc'] and self['proc'].poll() is None: - time.sleep(0.1) + if not check_open_port(self.config.listen, self.config.port): + return logging.error(f'A server is already running on port {self.config.port}') - self.stop() + logging.info(f'Starting webserver at {self.config.domain} ({self.config.listen}:{self.config.port})') + asyncio.run(self.handle_run()) def set_signal_handler(self, startup: bool) -> None: @@ -141,56 +144,54 @@ class Application(web.Application): pass + def stop(self, *_): + self['running'] = False - def start(self, dev: bool = False) -> None: - if self['proc']: - return - if not check_open_port(self.config.listen, self.config.port): - logging.error('Server already running on %s:%s', self.config.listen, self.config.port) - return - - cmd = [ - sys.executable, '-m', 'gunicorn', - 'relay.application:main_gunicorn', - '--bind', f'{self.config.listen}:{self.config.port}', - '--worker-class', 'aiohttp.GunicornWebWorker', - '--workers', str(self.config.workers), - '--env', f'CONFIG_FILE={self.config.path}', - '--reload-extra-file', pkgfiles('relay').joinpath('data', 'swagger.yaml'), - '--reload-extra-file', pkgfiles('relay').joinpath('data', 'statements.sql') - ] - - if dev: - cmd.append('--reload') + async def handle_run(self): + self['running'] = True self.set_signal_handler(True) - self['proc'] = subprocess.Popen(cmd) # pylint: disable=consider-using-with + + self['database'].connect() + self['cache'].setup() self['cleanup_thread'] = CacheCleanupThread(self) self['cleanup_thread'].start() + for i in range(self.config.workers): + worker = PushWorker(self['push_queue']) + worker.start() - def stop(self, *_) -> None: - if not self['proc']: - return + self['workers'].append(worker) - self['cleanup_thread'].stop() - self['proc'].terminate() - time_wait = 0.0 + runner = web.AppRunner(self, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{User-Agent}i"') + await runner.setup() - while self['proc'].poll() is None: - time.sleep(0.1) - time_wait += 0.1 + site = web.TCPSite(runner, + host = self.config.listen, + port = self.config.port, + reuse_address = True + ) - if time_wait >= 5.0: - self['proc'].kill() - break + await site.start() + self['starttime'] = datetime.now() + + while self['running']: + await asyncio.sleep(0.25) + + await site.stop() + + for worker in self['workers']: + worker.stop() self.set_signal_handler(False) - self['proc'] = None - self.cache.close() - self.database.disconnect() + self['starttime'] = None + self['running'] = False + self['cleanup_thread'].stop() + self['workers'].clear() + self['database'].disconnect() + self['cache'].close() class CacheCleanupThread(Thread): @@ -217,6 +218,40 @@ class CacheCleanupThread(Thread): self.running.clear() +class PushWorker(multiprocessing.Process): + def __init__(self, queue: multiprocessing.Queue): + multiprocessing.Process.__init__(self) + self.queue = queue + self.shutdown = multiprocessing.Event() + + + def stop(self) -> None: + self.shutdown.set() + + + def run(self) -> None: + asyncio.run(self.handle_queue()) + + + async def handle_queue(self) -> None: + client = HttpClient() + + while not self.shutdown.is_set(): + try: + inbox, message, instance = self.queue.get(block=True, timeout=0.25) + await client.post(inbox, message, instance) + + except Empty: + pass + + ## make sure an exception doesn't bring down the worker + except Exception: + traceback.print_exc() + + await client.close() + + + async def handle_access_log(request: web.Request, response: web.Response) -> None: address = request.headers.get( 'X-Forwarded-For', @@ -241,14 +276,3 @@ async def handle_cleanup(app: Application) -> None: await app.client.close() app.cache.close() app.database.disconnect() - - -async def main_gunicorn(): - try: - app = Application(os.environ['CONFIG_FILE'], gunicorn = True) - - except KeyError: - logging.error('Failed to set "CONFIG_FILE" environment. Trying to run without gunicorn?') - raise RuntimeError from None - - return app diff --git a/relay/cache.py b/relay/cache.py index 3f258eb..448267f 100644 --- a/relay/cache.py +++ b/relay/cache.py @@ -91,7 +91,6 @@ class Cache(ABC): def __init__(self, app: Application): self.app = app - self.setup() @abstractmethod @@ -158,8 +157,8 @@ class SqlCache(Cache): def __init__(self, app: Application): - self._db = get_database(app.config) Cache.__init__(self, app) + self._db = None def get(self, namespace: str, key: str) -> Item: @@ -232,6 +231,10 @@ class SqlCache(Cache): def setup(self) -> None: + if self._db.connected: + return + + self._db = get_database(self.app.config) self._db.connect() with self._db.session(True) as conn: @@ -247,7 +250,11 @@ class SqlCache(Cache): @register_cache class RedisCache(Cache): name: str = 'redis' - _rd: Redis + + + def __init__(self, app: Application): + Cache.__init__(self, app) + self._rd = None @property @@ -322,6 +329,9 @@ class RedisCache(Cache): def setup(self) -> None: + if self._rd: + return + options = { 'client_name': f'ActivityRelay_{self.app.config.domain}', 'decode_responses': True, diff --git a/relay/manage.py b/relay/manage.py index 3acd1c2..8066d79 100644 --- a/relay/manage.py +++ b/relay/manage.py @@ -10,7 +10,6 @@ import sys import typing from aputils.signer import Signer -from gunicorn.app.wsgiapp import WSGIApplication from pathlib import Path from shutil import copyfile from urllib.parse import urlparse @@ -208,9 +207,8 @@ def cli_setup(ctx: click.Context) -> None: @cli.command('run') -@click.option('--dev', '-d', is_flag = True, help = 'Enable worker reloading on code change') @click.pass_context -def cli_run(ctx: click.Context, dev: bool = False) -> None: +def cli_run(ctx: click.Context) -> None: 'Run the relay' if ctx.obj.config.domain.endswith('example.com') or not ctx.obj.signer: @@ -237,23 +235,12 @@ def cli_run(ctx: click.Context, dev: bool = False) -> None: click.echo(pip_command) return - if getattr(sys, 'frozen', False): - subprocess.run([sys.executable, 'run-gunicorn'], check = False) - - else: - ctx.obj.run(dev) + ctx.obj.run() # todo: figure out why the relay doesn't quit properly without this os._exit(0) -@cli.command('run-gunicorn') -@click.pass_context -def cli_run_gunicorn(ctx: click.Context) -> None: - runner = GunicornRunner(ctx.obj) - runner.run() - - @cli.command('convert') @click.option('--old-config', '-o', help = 'Path to the config file to convert from') @@ -921,30 +908,6 @@ def cli_whitelist_import(ctx: click.Context) -> None: click.echo('Imported whitelist from inboxes') -class GunicornRunner(WSGIApplication): - def __init__(self, app: Application): - self.app = app - self.app_uri = 'relay.application:main_gunicorn' - self.options = { - 'bind': f'{app.config.listen}:{app.config.port}', - 'worker_class': 'aiohttp.GunicornWebWorker', - 'workers': app.config.workers, - 'raw_env': f'CONFIG_FILE={app.config.path}' - } - - WSGIApplication.__init__(self) - - - def load_config(self): - for key, value in self.options.items(): - self.cfg.set(key, value) - - - def run(self): - logging.info('Starting webserver for %s', self.app.config.domain) - WSGIApplication.run(self) - - def main() -> None: # pylint: disable=no-value-for-parameter diff --git a/requirements.txt b/requirements.txt index 65b70b0..aea9c24 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,6 @@ aiohttp-swagger[performance]==1.0.16 aputils@https://git.barkshark.xyz/barkshark/aputils/archive/0.1.6a.tar.gz argon2-cffi==23.1.0 click>=8.1.2 -gunicorn==21.1.0 hiredis==2.3.2 pyyaml>=6.0 redis==5.0.1