use gunicorn to start the server

This commit is contained in:
Izalia Mae 2024-02-05 13:15:08 -05:00
parent 02ac1fa53b
commit bec5d5f207
5 changed files with 117 additions and 107 deletions

View file

@ -1,15 +1,17 @@
from __future__ import annotations
import asyncio
import queue
import os
import signal
import threading
import traceback
import subprocess
import sys
import time
import typing
from aiohttp import web
from aputils.signer import Signer
from datetime import datetime, timedelta
from gunicorn.app.wsgiapp import WSGIApplication
from . import logger as logging
from .cache import get_cache
@ -20,6 +22,7 @@ from .misc import check_open_port
from .views import VIEWS
if typing.TYPE_CHECKING:
from collections.abc import Awaitable
from tinysql import Database, Row
from typing import Any
from .cache import Cache
@ -31,21 +34,24 @@ if typing.TYPE_CHECKING:
class Application(web.Application):
DEFAULT: Application = None
def __init__(self, cfgpath: str):
def __init__(self, cfgpath: str, gunicorn: bool = False):
web.Application.__init__(self)
Application.DEFAULT = self
self['proc'] = None
self['signer'] = None
self['start_time'] = None
self['config'] = Config(cfgpath, load = True)
self['database'] = get_database(self.config)
self['client'] = HttpClient()
self['cache'] = get_cache(self)
self['workers'] = []
self['last_worker'] = 0
self['start_time'] = None
self['running'] = False
if not gunicorn:
return
self.on_response_prepare.append(handle_access_log)
for path, view in VIEWS:
self.router.add_view(path, view)
@ -96,17 +102,16 @@ class Application(web.Application):
def push_message(self, inbox: str, message: Message, instance: Row) -> None:
if self.config.workers <= 0:
asyncio.ensure_future(self.client.post(inbox, message, instance))
return
asyncio.ensure_future(self.client.post(inbox, message, instance))
worker = self['workers'][self['last_worker']]
worker.queue.put((inbox, message, instance))
self['last_worker'] += 1
def run(self, dev: bool = False) -> None:
self.start(dev)
if self['last_worker'] >= len(self['workers']):
self['last_worker'] = 0
while self['proc'] and self['proc'].poll() is None:
time.sleep(0.1)
self.stop()
def set_signal_handler(self, startup: bool) -> None:
@ -119,91 +124,101 @@ class Application(web.Application):
pass
def run(self) -> None:
if not check_open_port(self.config.listen, self.config.port):
logging.error('A server is already running on port %i', self.config.port)
def start(self, dev: bool = False) -> None:
if self['proc']:
return
for view in VIEWS:
self.router.add_view(*view)
if not check_open_port(self.config.listen, self.config.port):
logging.error('Server already running on %s:%s', self.config.listen, self.config.port)
return
logging.info(
'Starting webserver at %s (%s:%i)',
self.config.domain,
self.config.listen,
self.config.port
)
cmd = [
sys.executable, '-m', 'gunicorn',
'relay.application:main_gunicorn',
'--bind', f'{self.config.listen}:{self.config.port}',
'--worker-class', 'aiohttp.GunicornWebWorker',
'--workers', str(self.config.workers),
'--env', f'CONFIG_FILE={self.config.path}'
]
asyncio.run(self.handle_run())
def stop(self, *_: Any) -> None:
self['running'] = False
async def handle_run(self) -> None:
self['running'] = True
if dev:
cmd.append('--reload')
self.set_signal_handler(True)
if self.config.workers > 0:
for _ in range(self.config.workers):
worker = PushWorker(self)
worker.start()
self['workers'].append(worker)
runner = web.AppRunner(self, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{User-Agent}i"')
await runner.setup()
site = web.TCPSite(
runner,
host = self.config.listen,
port = self.config.port,
reuse_address = True
)
await site.start()
self['start_time'] = datetime.now()
while self['running']:
await asyncio.sleep(0.25)
await site.stop()
await self.client.close()
self['start_time'] = None
self['running'] = False
self['workers'].clear()
self['proc'] = subprocess.Popen(cmd) # pylint: disable=consider-using-with
class PushWorker(threading.Thread):
def stop(self, *_) -> None:
if not self['proc']:
return
self['proc'].terminate()
time_wait = 0.0
while self['proc'].poll() is None:
time.sleep(0.1)
time_wait += 0.1
if time_wait >= 5.0:
self['proc'].kill()
break
self.set_signal_handler(False)
self['proc'] = None
# not used, but keeping just in case
class GunicornRunner(WSGIApplication):
def __init__(self, app: Application):
threading.Thread.__init__(self)
self.app = app
self.queue = queue.Queue()
self.client = None
self.app_uri = 'relay.application:main_gunicorn'
self.options = {
'bind': f'{app.config.listen}:{app.config.port}',
'worker_class': 'aiohttp.GunicornWebWorker',
'workers': app.config.workers,
'raw_env': f'CONFIG_FILE={app.config.path}'
}
WSGIApplication.__init__(self)
def run(self) -> None:
asyncio.run(self.handle_queue())
def load_config(self):
for key, value in self.options.items():
self.cfg.set(key, value)
async def handle_queue(self) -> None:
self.client = HttpClient()
def run(self):
logging.info('Starting webserver for %s', self.app.config.domain)
WSGIApplication.run(self)
while self.app['running']:
try:
inbox, message, instance = self.queue.get(block=True, timeout=0.25)
self.queue.task_done()
logging.verbose('New push from Thread-%i', threading.get_ident())
await self.client.post(inbox, message, instance)
except queue.Empty:
pass
async def handle_access_log(request: web.Request, response: web.Response) -> None:
address = request.headers.get(
'X-Forwarded-For',
request.headers.get(
'X-Real-Ip',
request.remote
)
)
## make sure an exception doesn't bring down the worker
except Exception:
traceback.print_exc()
logging.info(
'%s "%s %s" %i %i "%s"',
address,
request.method,
request.path,
response.status,
len(response.body),
request.headers.get('User-Agent', 'n/a')
)
await self.client.close()
async def main_gunicorn():
try:
app = Application(os.environ['CONFIG_FILE'], gunicorn = True)
except KeyError:
logging.error('Failed to set "CONFIG_FILE" environment. Trying to run without gunicorn?')
raise
return app

View file

@ -70,7 +70,6 @@ error: Callable = logging.error
critical: Callable = logging.critical
logging.addLevelName(LogLevel['VERBOSE'], 'VERBOSE')
env_log_level = os.environ.get('LOG_LEVEL', 'INFO').upper()
try:
@ -79,22 +78,15 @@ try:
except KeyError:
env_log_file = None
try:
log_level = LogLevel[env_log_level]
except KeyError:
print('Invalid log level:', env_log_level)
log_level = LogLevel['INFO']
handlers = [logging.StreamHandler()]
if env_log_file:
handlers.append(logging.FileHandler(env_log_file))
logging.addLevelName(LogLevel['VERBOSE'], 'VERBOSE')
logging.basicConfig(
level = log_level,
level = LogLevel.INFO,
format = '[%(asctime)s] %(levelname)s: %(message)s',
datefmt = '%Y-%m-%d %H:%M:%S',
handlers = handlers
)

View file

@ -18,7 +18,7 @@ from .application import Application
from .compat import RelayConfig, RelayDatabase
from .database import get_database
from .database.connection import RELAY_SOFTWARE
from .misc import IS_DOCKER, Message, check_open_port
from .misc import IS_DOCKER, Message
if typing.TYPE_CHECKING:
from tinysql import Row
@ -70,6 +70,11 @@ def cli(ctx: click.Context, config: str) -> None:
cli_setup.callback()
else:
click.echo(
'[DEPRECATED] Running the relay without the "run" command will be removed in the ' +
'future.'
)
cli_run.callback()
@ -200,8 +205,9 @@ def cli_setup(ctx: click.Context) -> None:
@cli.command('run')
@click.option('--dev', '-d', is_flag = True, help = 'Enable worker reloading on code change')
@click.pass_context
def cli_run(ctx: click.Context) -> None:
def cli_run(ctx: click.Context, dev: bool = False) -> None:
'Run the relay'
if ctx.obj.config.domain.endswith('example.com') or not ctx.obj.signer:
@ -228,11 +234,7 @@ def cli_run(ctx: click.Context) -> None:
click.echo(pip_command)
return
if not check_open_port(ctx.obj.config.listen, ctx.obj.config.port):
click.echo(f'Error: A server is already running on port {ctx.obj.config.port}')
return
ctx.obj.run()
ctx.obj.run(dev)
@cli.command('convert')

View file

@ -14,9 +14,9 @@ from functools import cached_property
from uuid import uuid4
if typing.TYPE_CHECKING:
from collections.abc import Coroutine, Generator
from collections.abc import Awaitable, Coroutine, Generator
from tinysql import Connection
from typing import Any, Awaitable
from typing import Any
from .application import Application
from .cache import Cache
from .config import Config
@ -236,7 +236,7 @@ class Response(AiohttpResponse):
class View(AbstractView):
def __await__(self) -> Generator[Response]:
if (self.request.method) not in METHODS:
if self.request.method not in METHODS:
raise HTTPMethodNotAllowed(self.request.method, self.allowed_methods)
if not (handler := self.handlers.get(self.request.method)):

View file

@ -1,6 +1,7 @@
aiohttp>=3.9.1
aputils@https://git.barkshark.xyz/barkshark/aputils/archive/0.1.6a.tar.gz
click>=8.1.2
gunicorn==21.1.0
hiredis==2.3.2
pyyaml>=6.0
redis==5.0.1