sedi-relay/relay/application.py

248 lines
5.4 KiB
Python
Raw Normal View History

2022-11-07 12:54:32 +00:00
import asyncio
2022-12-13 13:27:09 +00:00
import inspect
2022-11-07 12:54:32 +00:00
import logging
import os
2022-11-20 10:50:14 +00:00
import queue
2022-11-07 12:54:32 +00:00
import signal
2022-11-20 10:50:14 +00:00
import threading
import traceback
2022-11-07 12:54:32 +00:00
from aiohttp import web
2022-12-13 13:27:09 +00:00
from aputils import Signer
2022-11-07 12:54:32 +00:00
from datetime import datetime, timedelta
2022-12-13 13:27:09 +00:00
from .config import Config
from .database import Database
from .http_client import HttpClient
2022-12-14 10:31:45 +00:00
from .logger import set_level
from .misc import DotDict, check_open_port, set_app
2022-11-07 12:54:32 +00:00
from .views import routes
class Application(web.Application):
def __init__(self, cfgpath):
2022-12-13 13:27:09 +00:00
web.Application.__init__(self,
middlewares = [
server_middleware
]
)
2022-11-07 12:54:32 +00:00
2022-12-13 13:27:09 +00:00
set_app(self)
2022-11-07 12:54:32 +00:00
2022-12-13 13:27:09 +00:00
self['config'] = Config(cfgpath)
self['database'] = Database(**self.config.dbconfig)
self['client'] = HttpClient()
2022-12-13 13:27:09 +00:00
self['starttime'] = None
self['signer'] = None
self['running'] = False
2022-11-20 10:50:14 +00:00
self['workers'] = []
self['last_worker'] = 0
2022-11-07 12:54:32 +00:00
2022-12-13 13:27:09 +00:00
self.database.create()
2022-11-18 19:10:39 +00:00
2022-12-14 10:31:45 +00:00
with self.database.session as s:
set_level(s.get_config('log_level'))
2022-11-07 12:54:32 +00:00
@property
def client(self):
return self['client']
2022-11-07 12:54:32 +00:00
@property
def config(self):
return self['config']
@property
def database(self):
return self['database']
2022-12-13 13:27:09 +00:00
@property
def signer(self):
if not self['signer']:
with self.database.session as s:
privkey = s.get_config('privkey')
if not privkey:
self['signer'] = Signer.new(self.config.keyid)
s.put_config('privkey', self['signer'].export())
else:
self['signer'] = Signer(privkey, self.config.keyid)
return self['signer']
2022-11-07 12:54:32 +00:00
@property
def uptime(self):
if not self['starttime']:
return timedelta(seconds=0)
2022-12-13 13:27:09 +00:00
return datetime.now() - self['starttime']
2022-11-07 12:54:32 +00:00
2022-11-20 10:50:14 +00:00
def push_message(self, inbox, message):
2022-12-13 13:27:09 +00:00
if len(self['workers']) <= 0:
return asyncio.ensure_future(self.client.post(inbox, message))
2022-11-20 10:50:14 +00:00
worker = self['workers'][self['last_worker']]
worker.queue.put((inbox, message))
self['last_worker'] += 1
if self['last_worker'] >= len(self['workers']):
self['last_worker'] = 0
def set_signal_handler(self, enable=True):
2022-11-16 18:26:47 +00:00
for sig in {'SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGTERM'}:
try:
signal.signal(getattr(signal, sig), self.stop if enable else signal.SIG_DFL)
2022-11-16 18:26:47 +00:00
# some signals don't exist in windows, so skip them
except AttributeError:
pass
2022-11-07 12:54:32 +00:00
def run(self):
if not check_open_port(self.config.listen, self.config.port):
return logging.error(f'A server is already running on port {self.config.port}')
for route in routes:
self.router.add_route(*route)
logging.info(f'Starting webserver at {self.config.host} ({self.config.listen}:{self.config.port})')
asyncio.run(self.handle_run())
2022-12-14 10:14:17 +00:00
self.database.disconnect()
2022-11-07 12:54:32 +00:00
def stop(self, *_):
self['running'] = False
2022-12-13 13:27:09 +00:00
def setup(self):
self.client.setup()
2022-11-07 12:54:32 +00:00
async def handle_run(self):
self.set_signal_handler(True)
2022-11-07 12:54:32 +00:00
self['running'] = True
2022-12-13 13:27:09 +00:00
with self.database.session as s:
workers = s.get_config('workers')
2022-11-20 10:50:14 +00:00
2022-12-13 13:27:09 +00:00
if workers > 0:
for i in range(workers):
worker = PushWorker(self)
worker.start()
self['workers'].append(worker)
2022-11-20 10:50:14 +00:00
2022-11-07 12:54:32 +00:00
runner = web.AppRunner(self, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{User-Agent}i"')
await runner.setup()
site = web.TCPSite(runner,
host = self.config.listen,
port = self.config.port,
reuse_address = True
)
await site.start()
self['starttime'] = datetime.now()
while self['running']:
await asyncio.sleep(0.25)
await site.stop()
self['starttime'] = None
self['running'] = False
2022-11-20 10:50:14 +00:00
self['workers'].clear()
self.set_signal_handler(False)
2022-11-10 17:39:37 +00:00
2022-11-20 10:50:14 +00:00
class PushWorker(threading.Thread):
def __init__(self, app):
threading.Thread.__init__(self)
self.app = app
self.queue = queue.Queue()
def run(self):
2022-12-13 13:27:09 +00:00
self.client = HttpClient()
self.client.setup()
2022-11-20 10:50:14 +00:00
asyncio.run(self.handle_queue())
async def handle_queue(self):
while self.app['running']:
try:
inbox, message = self.queue.get(block=True, timeout=0.25)
self.queue.task_done()
logging.verbose(f'New push from Thread-{threading.get_ident()}')
await self.client.post(inbox, message)
2022-11-20 10:50:14 +00:00
except queue.Empty:
pass
## make sure an exception doesn't bring down the worker
except Exception:
traceback.print_exc()
await self.client.close()
2022-11-20 10:50:14 +00:00
2022-12-13 13:27:09 +00:00
@web.middleware
async def server_middleware(request, handler):
if len(inspect.signature(handler).parameters) == 1:
response = await handler(request)
else:
with request.database.session as s:
response = await handler(request, s)
## make sure there's some sort of response
if response == None:
logging.error(f'No response for handler: {handler}')
response = Response.new_error(500, 'No response')
response.headers['Server'] = 'ActivityRelay'
return response
2022-11-18 21:38:39 +00:00
## Can't sub-class web.Request, so let's just add some properties
def request_actor(self):
try: return self['actor']
except KeyError: pass
def request_instance(self):
try: return self['instance']
except KeyError: pass
2022-11-18 21:38:39 +00:00
def request_message(self):
try: return self['message']
except KeyError: pass
def request_signature(self):
if 'signature' not in self._state:
try: self['signature'] = DotDict.new_from_signature(self.headers['signature'])
except KeyError: return
return self['signature']
setattr(web.Request, 'actor', property(request_actor))
setattr(web.Request, 'instance', property(request_instance))
2022-11-18 21:38:39 +00:00
setattr(web.Request, 'message', property(request_message))
setattr(web.Request, 'signature', property(request_signature))
setattr(web.Request, 'config', property(lambda self: self.app.config))
setattr(web.Request, 'database', property(lambda self: self.app.database))