replace gunicorn with push worker processes

This commit is contained in:
Izalia Mae 2024-02-22 11:54:09 -05:00
parent a2ae1bdd21
commit 001aa87667
6 changed files with 101 additions and 105 deletions

View file

@ -9,7 +9,7 @@ use `python3 -m relay` if installed via pip or `~/.local/bin/activityrelay` if i
## Run ## Run
Run the relay. Optionally add `-d` or `--dev` to enable auto-reloading on code changes. Run the relay.
activityrelay run activityrelay run

View file

@ -19,10 +19,10 @@ proxy is on the same host.
port: 8080 port: 8080
### Web Workers ### Push Workers
The number of processes to spawn for handling web requests. Leave it at 0 to automatically detect The number of processes to spawn for pushing messages to subscribed instances. Leave it at 0 to
how many processes should be spawned. automatically detect how many processes should be spawned.
workers: 0 workers: 0

View file

@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import multiprocessing
import os import os
import signal import signal
import subprocess import subprocess
@ -12,6 +13,7 @@ from aiohttp import web
from aiohttp_swagger import setup_swagger from aiohttp_swagger import setup_swagger
from aputils.signer import Signer from aputils.signer import Signer
from datetime import datetime, timedelta from datetime import datetime, timedelta
from queue import Empty
from threading import Event, Thread from threading import Event, Thread
from . import logger as logging from . import logger as logging
@ -40,7 +42,7 @@ if typing.TYPE_CHECKING:
class Application(web.Application): class Application(web.Application):
DEFAULT: Application = None DEFAULT: Application = None
def __init__(self, cfgpath: str, gunicorn: bool = False): def __init__(self, cfgpath: str):
web.Application.__init__(self, web.Application.__init__(self,
middlewares = [ middlewares = [
handle_api_path handle_api_path
@ -49,7 +51,7 @@ class Application(web.Application):
Application.DEFAULT = self Application.DEFAULT = self
self['proc'] = None self['running'] = None
self['signer'] = None self['signer'] = None
self['start_time'] = None self['start_time'] = None
self['cleanup_thread'] = None self['cleanup_thread'] = None
@ -58,9 +60,8 @@ class Application(web.Application):
self['database'] = get_database(self.config) self['database'] = get_database(self.config)
self['client'] = HttpClient() self['client'] = HttpClient()
self['cache'] = get_cache(self) self['cache'] = get_cache(self)
self['push_queue'] = multiprocessing.Queue()
if not gunicorn: self['workers'] = []
return
self.on_response_prepare.append(handle_access_log) self.on_response_prepare.append(handle_access_log)
self.on_cleanup.append(handle_cleanup) self.on_cleanup.append(handle_cleanup)
@ -119,16 +120,18 @@ class Application(web.Application):
def push_message(self, inbox: str, message: Message, instance: Row) -> None: def push_message(self, inbox: str, message: Message, instance: Row) -> None:
asyncio.ensure_future(self.client.post(inbox, message, instance)) self['push_queue'].put((inbox, message, instance))
def run(self, dev: bool = False) -> None: def run(self) -> None:
self.start(dev) if self["running"]:
return
while self['proc'] and self['proc'].poll() is None: if not check_open_port(self.config.listen, self.config.port):
time.sleep(0.1) return logging.error(f'A server is already running on port {self.config.port}')
self.stop() logging.info(f'Starting webserver at {self.config.domain} ({self.config.listen}:{self.config.port})')
asyncio.run(self.handle_run())
def set_signal_handler(self, startup: bool) -> None: def set_signal_handler(self, startup: bool) -> None:
@ -141,56 +144,54 @@ class Application(web.Application):
pass pass
def stop(self, *_):
self['running'] = False
def start(self, dev: bool = False) -> None:
if self['proc']:
return
if not check_open_port(self.config.listen, self.config.port): async def handle_run(self):
logging.error('Server already running on %s:%s', self.config.listen, self.config.port) self['running'] = True
return
cmd = [
sys.executable, '-m', 'gunicorn',
'relay.application:main_gunicorn',
'--bind', f'{self.config.listen}:{self.config.port}',
'--worker-class', 'aiohttp.GunicornWebWorker',
'--workers', str(self.config.workers),
'--env', f'CONFIG_FILE={self.config.path}',
'--reload-extra-file', pkgfiles('relay').joinpath('data', 'swagger.yaml'),
'--reload-extra-file', pkgfiles('relay').joinpath('data', 'statements.sql')
]
if dev:
cmd.append('--reload')
self.set_signal_handler(True) self.set_signal_handler(True)
self['proc'] = subprocess.Popen(cmd) # pylint: disable=consider-using-with
self['database'].connect()
self['cache'].setup()
self['cleanup_thread'] = CacheCleanupThread(self) self['cleanup_thread'] = CacheCleanupThread(self)
self['cleanup_thread'].start() self['cleanup_thread'].start()
for i in range(self.config.workers):
worker = PushWorker(self['push_queue'])
worker.start()
def stop(self, *_) -> None: self['workers'].append(worker)
if not self['proc']:
return
self['cleanup_thread'].stop() runner = web.AppRunner(self, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{User-Agent}i"')
self['proc'].terminate() await runner.setup()
time_wait = 0.0
while self['proc'].poll() is None: site = web.TCPSite(runner,
time.sleep(0.1) host = self.config.listen,
time_wait += 0.1 port = self.config.port,
reuse_address = True
)
if time_wait >= 5.0: await site.start()
self['proc'].kill() self['starttime'] = datetime.now()
break
while self['running']:
await asyncio.sleep(0.25)
await site.stop()
for worker in self['workers']:
worker.stop()
self.set_signal_handler(False) self.set_signal_handler(False)
self['proc'] = None
self.cache.close() self['starttime'] = None
self.database.disconnect() self['running'] = False
self['cleanup_thread'].stop()
self['workers'].clear()
self['database'].disconnect()
self['cache'].close()
class CacheCleanupThread(Thread): class CacheCleanupThread(Thread):
@ -217,6 +218,40 @@ class CacheCleanupThread(Thread):
self.running.clear() self.running.clear()
class PushWorker(multiprocessing.Process):
def __init__(self, queue: multiprocessing.Queue):
multiprocessing.Process.__init__(self)
self.queue = queue
self.shutdown = multiprocessing.Event()
def stop(self) -> None:
self.shutdown.set()
def run(self) -> None:
asyncio.run(self.handle_queue())
async def handle_queue(self) -> None:
client = HttpClient()
while not self.shutdown.is_set():
try:
inbox, message, instance = self.queue.get(block=True, timeout=0.25)
await client.post(inbox, message, instance)
except Empty:
pass
## make sure an exception doesn't bring down the worker
except Exception:
traceback.print_exc()
await client.close()
async def handle_access_log(request: web.Request, response: web.Response) -> None: async def handle_access_log(request: web.Request, response: web.Response) -> None:
address = request.headers.get( address = request.headers.get(
'X-Forwarded-For', 'X-Forwarded-For',
@ -241,14 +276,3 @@ async def handle_cleanup(app: Application) -> None:
await app.client.close() await app.client.close()
app.cache.close() app.cache.close()
app.database.disconnect() app.database.disconnect()
async def main_gunicorn():
try:
app = Application(os.environ['CONFIG_FILE'], gunicorn = True)
except KeyError:
logging.error('Failed to set "CONFIG_FILE" environment. Trying to run without gunicorn?')
raise RuntimeError from None
return app

View file

@ -91,7 +91,6 @@ class Cache(ABC):
def __init__(self, app: Application): def __init__(self, app: Application):
self.app = app self.app = app
self.setup()
@abstractmethod @abstractmethod
@ -158,8 +157,8 @@ class SqlCache(Cache):
def __init__(self, app: Application): def __init__(self, app: Application):
self._db = get_database(app.config)
Cache.__init__(self, app) Cache.__init__(self, app)
self._db = None
def get(self, namespace: str, key: str) -> Item: def get(self, namespace: str, key: str) -> Item:
@ -232,6 +231,10 @@ class SqlCache(Cache):
def setup(self) -> None: def setup(self) -> None:
if self._db.connected:
return
self._db = get_database(self.app.config)
self._db.connect() self._db.connect()
with self._db.session(True) as conn: with self._db.session(True) as conn:
@ -247,7 +250,11 @@ class SqlCache(Cache):
@register_cache @register_cache
class RedisCache(Cache): class RedisCache(Cache):
name: str = 'redis' name: str = 'redis'
_rd: Redis
def __init__(self, app: Application):
Cache.__init__(self, app)
self._rd = None
@property @property
@ -322,6 +329,9 @@ class RedisCache(Cache):
def setup(self) -> None: def setup(self) -> None:
if self._rd:
return
options = { options = {
'client_name': f'ActivityRelay_{self.app.config.domain}', 'client_name': f'ActivityRelay_{self.app.config.domain}',
'decode_responses': True, 'decode_responses': True,

View file

@ -10,7 +10,6 @@ import sys
import typing import typing
from aputils.signer import Signer from aputils.signer import Signer
from gunicorn.app.wsgiapp import WSGIApplication
from pathlib import Path from pathlib import Path
from shutil import copyfile from shutil import copyfile
from urllib.parse import urlparse from urllib.parse import urlparse
@ -208,9 +207,8 @@ def cli_setup(ctx: click.Context) -> None:
@cli.command('run') @cli.command('run')
@click.option('--dev', '-d', is_flag = True, help = 'Enable worker reloading on code change')
@click.pass_context @click.pass_context
def cli_run(ctx: click.Context, dev: bool = False) -> None: def cli_run(ctx: click.Context) -> None:
'Run the relay' 'Run the relay'
if ctx.obj.config.domain.endswith('example.com') or not ctx.obj.signer: if ctx.obj.config.domain.endswith('example.com') or not ctx.obj.signer:
@ -237,23 +235,12 @@ def cli_run(ctx: click.Context, dev: bool = False) -> None:
click.echo(pip_command) click.echo(pip_command)
return return
if getattr(sys, 'frozen', False): ctx.obj.run()
subprocess.run([sys.executable, 'run-gunicorn'], check = False)
else:
ctx.obj.run(dev)
# todo: figure out why the relay doesn't quit properly without this # todo: figure out why the relay doesn't quit properly without this
os._exit(0) os._exit(0)
@cli.command('run-gunicorn')
@click.pass_context
def cli_run_gunicorn(ctx: click.Context) -> None:
runner = GunicornRunner(ctx.obj)
runner.run()
@cli.command('convert') @cli.command('convert')
@click.option('--old-config', '-o', help = 'Path to the config file to convert from') @click.option('--old-config', '-o', help = 'Path to the config file to convert from')
@ -921,30 +908,6 @@ def cli_whitelist_import(ctx: click.Context) -> None:
click.echo('Imported whitelist from inboxes') click.echo('Imported whitelist from inboxes')
class GunicornRunner(WSGIApplication):
def __init__(self, app: Application):
self.app = app
self.app_uri = 'relay.application:main_gunicorn'
self.options = {
'bind': f'{app.config.listen}:{app.config.port}',
'worker_class': 'aiohttp.GunicornWebWorker',
'workers': app.config.workers,
'raw_env': f'CONFIG_FILE={app.config.path}'
}
WSGIApplication.__init__(self)
def load_config(self):
for key, value in self.options.items():
self.cfg.set(key, value)
def run(self):
logging.info('Starting webserver for %s', self.app.config.domain)
WSGIApplication.run(self)
def main() -> None: def main() -> None:
# pylint: disable=no-value-for-parameter # pylint: disable=no-value-for-parameter

View file

@ -3,7 +3,6 @@ aiohttp-swagger[performance]==1.0.16
aputils@https://git.barkshark.xyz/barkshark/aputils/archive/0.1.6a.tar.gz aputils@https://git.barkshark.xyz/barkshark/aputils/archive/0.1.6a.tar.gz
argon2-cffi==23.1.0 argon2-cffi==23.1.0
click>=8.1.2 click>=8.1.2
gunicorn==21.1.0
hiredis==2.3.2 hiredis==2.3.2
pyyaml>=6.0 pyyaml>=6.0
redis==5.0.1 redis==5.0.1