From 0d77614bdbfd7de686d0c7d68c459b27cc671c8c Mon Sep 17 00:00:00 2001 From: Izalia Mae Date: Sat, 9 Apr 2022 03:07:16 -0400 Subject: [PATCH] rework final draft --- relay/__init__.py | 10 -- relay/__main__.py | 60 +------ relay/application.py | 20 --- relay/config.py | 95 +++++++++-- relay/http_debug.py | 2 +- relay/manage.py | 369 ++++++++++++++++++++++++++++++++++++------- relay/misc.py | 23 +-- relay/processors.py | 22 ++- setup.cfg | 25 ++- 9 files changed, 434 insertions(+), 192 deletions(-) diff --git a/relay/__init__.py b/relay/__init__.py index ba80f3d..8fda65c 100644 --- a/relay/__init__.py +++ b/relay/__init__.py @@ -12,13 +12,3 @@ def set_app(app): global APP APP = app - - -#import argparse - -#parser = argparse.ArgumentParser( - #description="A generic LitePub relay (works with all LitePub consumers and Mastodon).", - #prog="python -m relay") -#parser.add_argument("-c", "--config", type=str, default="relay.yaml", - #metavar="", help="the path to your config file") -#args = parser.parse_args() diff --git a/relay/__main__.py b/relay/__main__.py index cbe5535..3280f12 100644 --- a/relay/__main__.py +++ b/relay/__main__.py @@ -1,63 +1,5 @@ -import Crypto -import asyncio -import logging -import platform -import sys - -from aiohttp.web import AppRunner, TCPSite - -from . import views -from .application import app - - -def crypto_check(): - vers_split = platform.python_version().split('.') - pip_command = 'pip3 uninstall pycrypto && pip3 install pycryptodome' - - if Crypto.__version__ != '2.6.1': - return - - if int(vers_split[1]) > 7 and Crypto.__version__ == '2.6.1': - logging.error('PyCrypto is broken on Python 3.8+. Please replace it with pycryptodome before running again. Exiting...') - logging.error(pip_command) - sys.exit() - - else: - logging.warning('PyCrypto is old and should be replaced with pycryptodome') - logging.warning(pip_command) - - -async def start_webserver(): - config = app['config'] - runner = AppRunner(app, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{Referer}i" "%{User-Agent}i"') - - logging.info(f'Starting webserver at {config.host} ({config.listen}:{config.port})') - await runner.setup() - - site = TCPSite(runner, config.listen, config.port) - await site.start() - -def main(): - # web pages - app.router.add_get('/', views.home) - - # endpoints - app.router.add_post('/actor', views.inbox) - app.router.add_post('/inbox', views.inbox) - app.router.add_get('/actor', views.actor) - app.router.add_get('/nodeinfo/2.0.json', views.nodeinfo_2_0) - app.router.add_get('/.well-known/nodeinfo', views.nodeinfo_wellknown) - app.router.add_get('/.well-known/webfinger', views.webfinger) - - if logging.DEBUG <= logging.root.level: - app.router.add_get('/stats', views.stats) - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - asyncio.ensure_future(start_webserver(), loop=loop) - loop.run_forever() +from .manage import main if __name__ == '__main__': - crypto_check() main() diff --git a/relay/application.py b/relay/application.py index c4d971d..39f7bd0 100644 --- a/relay/application.py +++ b/relay/application.py @@ -3,12 +3,9 @@ import logging import sys from aiohttp.web import Application -from cachetools import LRUCache from . import set_app from . import views -from .config import DotDict, RelayConfig -from .database import RelayDatabase from .middleware import http_signatures_middleware @@ -16,21 +13,4 @@ app = Application(middlewares=[ http_signatures_middleware ]) -app['config'] = RelayConfig('relay.yaml') - -if not app['config'].load(): - app['config'].save() - - logging.error('Relay is not setup. Change the host in relay.yaml at the least and try again') - sys.exit(1) - -app['database'] = RelayDatabase(app['config']) -app['database'].load() - -app['cache'] = DotDict() -app['semaphore'] = asyncio.Semaphore(app['config']['push_limit']) - -for key in app['config'].cachekeys: - app['cache'][key] = LRUCache(app['config'][key]) - set_app(app) diff --git a/relay/config.py b/relay/config.py index 4521ed2..df70f2d 100644 --- a/relay/config.py +++ b/relay/config.py @@ -47,10 +47,10 @@ class DotDict(dict): class RelayConfig(DotDict): apkeys = { 'host', + 'whitelist_enabled', 'blocked_software', 'blocked_instances', - 'whitelist', - 'whitelist_enabled' + 'whitelist' } cachekeys = { @@ -61,13 +61,15 @@ class RelayConfig(DotDict): def __init__(self, path): + self._path = Path(path).expanduser().resolve() + super().__init__({ - 'db': 'relay.jsonld', + 'db': f'{self._path.stem}.jsonld', 'listen': '0.0.0.0', 'port': 8080, 'note': 'Make a note about your instance here.', 'push_limit': 512, - 'host': 'example.com', + 'host': 'relay.example.com', 'blocked_software': [], 'blocked_instances': [], 'whitelist': [], @@ -77,8 +79,6 @@ class RelayConfig(DotDict): 'digests': 1024 }) - self._path = Path(path).expanduser().resolve() - def __setitem__(self, key, value): if key in ['blocked_instances', 'blocked_software', 'whitelist']: @@ -118,12 +118,85 @@ class RelayConfig(DotDict): return f'{self.actor}#main-key' - def is_banned(self, inbox): - return urlparse(inbox).hostname in self.blocked_instances + def ban_instance(self, instance): + if instance.startswith('http'): + instance = urlparse(instance).hostname + + if self.is_banned(instance): + return False + + self.blocked_instances.append(instance) + return True - def is_whitelisted(self, inbox): - return urlparse(inbox).hostname in self.whitelist + def unban_instance(self, instance): + if instance.startswith('http'): + instance = urlparse(instance).hostname + + try: + self.blocked_instances.remove(instance) + return True + + except: + return False + + + def ban_software(self, software): + if self.is_banned_software(software): + return False + + self.blocked_software.append(software) + return True + + + def unban_software(self, software): + try: + self.blocked_software.remove(software) + return True + + except: + return False + + + def add_whitelist(self, instance): + if instance.startswith('http'): + instance = urlparse(instance).hostname + + if self.is_whitelisted(instance): + return False + + self.whitelist.append(instance) + return True + + + def del_whitelist(self, instance): + if instance.startswith('http'): + instance = urlparse(instance).hostname + + try: + self.whitelist.remove(instance) + return True + + except: + return False + + + def is_banned(self, instance): + if instance.startswith('http'): + instance = urlparse(instance).hostname + + return instance in self.blocked_instances + + + def is_banned_software(self, software): + return software in self.blocked_software + + + def is_whitelisted(self, instance): + if instance.startswith('http'): + instance = urlparse(instance).hostname + + return instance in self.whitelist def load(self): @@ -166,7 +239,7 @@ class RelayConfig(DotDict): def save(self): config = { - 'db': self.db, + 'db': self['db'], 'listen': self.listen, 'port': self.port, 'note': self.note, diff --git a/relay/http_debug.py b/relay/http_debug.py index a3f7515..2a2ae67 100644 --- a/relay/http_debug.py +++ b/relay/http_debug.py @@ -58,7 +58,7 @@ async def on_request_exception(session, trace_config_ctx, params): def http_debug(): - if logging.DEBUG <= logging.root.level: + if logging.DEBUG >= logging.root.level: return trace_config = aiohttp.TraceConfig() diff --git a/relay/manage.py b/relay/manage.py index 152bd33..44bca8e 100644 --- a/relay/manage.py +++ b/relay/manage.py @@ -1,80 +1,331 @@ +import Crypto import asyncio -import sys -import simplejson as json +import click +import json +import logging +import platform +from aiohttp.web import AppRunner, TCPSite +from cachetools import LRUCache + +from . import misc, views from .application import app +from .config import DotDict, RelayConfig +from .database import RelayDatabase from .misc import follow_remote_actor, unfollow_remote_actor -def relay_list(): - print('Connected to the following instances or relays:') - [print('-', inbox) for inbox in app['database'].inboxes] +@click.group('cli', context_settings={'show_default': True}, invoke_without_command=True) +@click.option('--config', '-c', default='relay.yaml', help='path to the relay\'s config') +@click.pass_context +def cli(ctx, config): + app['config'] = RelayConfig(config) + + if not app['config'].load(): + app['config'].save() + + app['database'] = RelayDatabase(app['config']) + app['database'].load() + + app['cache'] = DotDict() + app['semaphore'] = asyncio.Semaphore(app['config']['push_limit']) + + for key in app['config'].cachekeys: + app['cache'][key] = LRUCache(app['config'][key]) + + if not ctx.invoked_subcommand: + relay_run.callback() -def relay_follow(): - if len(sys.argv) < 3: - print('usage: python3 -m relay.manage follow ') - exit() +@cli.command('list') +@click.argument('type', required=False, default='inbox') +def relay_list(type): + 'List all following instances' - target = sys.argv[2] + assert type in [None, 'inbox', 'ban', 'whitelist'] - loop = asyncio.get_event_loop() - loop.run_until_complete(follow_remote_actor(target)) + config = app['config'] + database = app['database'] - print('Sent follow message to:', target) + if not type or type == 'inbox': + click.echo('Connected to the following instances or relays:') + + for inbox in database.inboxes: + click.echo(f'- {inbox}') + + elif type == 'ban': + click.echo('Banned instances:') + + for instance in config.blocked_instances: + click.echo(f'- {instance}') + + click.echo('\nBanned software:') + + for software in config.blocked_software: + click.echo(f'- {software}') + + elif type == 'whitelist': + click.echo('Whitelisted instances:') + + for instance in config.whitelist: + click.echo(f'- {instance}') -def relay_unfollow(): - if len(sys.argv) < 3: - print('usage: python3 -m relay.manage unfollow ') - exit() +@cli.command('follow') +@click.argument('actor') +def relay_follow(actor): + 'Follow an actor (Relay must be running)' - target = sys.argv[2] - - loop = asyncio.get_event_loop() - loop.run_until_complete(unfollow_remote_actor(target)) - - print('Sent unfollow message to:', target) - -def relay_forceremove(): - if len(sys.argv) < 3: - print('usage: python3 -m relay.manage force-remove ') - exit() - - target = sys.argv[2] - - try: - app['database'].del_inbox(target) - print('Removed inbox from DB:', target) - - except KeyError: - print('Failed to remove inbox from DB:', target) + loop = asyncio.new_event_loop() + loop.run_until_complete(handle_follow_actor(actor)) -TASKS = { - 'list': relay_list, - 'follow': relay_follow, - 'unfollow': relay_unfollow, - 'force-remove': relay_forceremove -} +@cli.command('unfollow') +@click.argument('actor') +def relay_follow(actor): + 'Unfollow an actor (Relay must be running)' + + loop = asyncio.new_event_loop() + loop.run_until_complete(handle_unfollow_actor(actor)) -def usage(): - print('usage: python3 -m relay.manage [...]') - print('tasks:') - [print('-', task) for task in TASKS.keys()] - exit() +@cli.command('add') +@click.argument('inbox') +def relay_add(inbox): + 'Add an inbox to the database' + + database = app['database'] + config = app['config'] + + if not inbox.startswith('http'): + inbox = f'https://{inbox}/inbox' + + if database.get_inbox(inbox): + click.echo(f'Error: Inbox already in database: {inbox}') + return + + if database.get_inbox(inbox): + click.echo(f'Error: Already added inbox: {inbox}') + return + + if config.is_banned(inbox): + click.echo(f'Error: Refusing to add banned inbox: {inbox}') + return + + database.add_inbox(inbox) + database.save() + click.echo(f'Added inbox to the database: {inbox}') + + +@cli.command('remove') +@click.argument('inbox') +def relay_remove(inbox): + 'Remove an inbox from the database' + + database = app['database'] + dbinbox = database.get_inbox(inbox) + + if not dbinbox: + click.echo(f'Error: Inbox does not exist: {inbox}') + return + + database.del_inbox(dbinbox) + database.save() + click.echo(f'Removed inbox from the database: {inbox}') + + +# todo: add nested groups +@cli.command('ban') +@click.argument('type') +@click.argument('target') +def relay_ban(type, target): + 'Ban an instance or software' + + assert type in ['instance', 'software'] + + config = app['config'] + database = app['database'] + inbox = database.get_inbox(target) + + bancmd = getattr(config, f'ban_{type}') + + if bancmd(target): + config.save() + + if inbox: + database.del_inbox(inbox) + database.save() + + click.echo(f'Banned {type}: {target}') + return + + click.echo(f'{type.title()} already banned: {target}') + + +@cli.command('unban') +@click.argument('type') +@click.argument('target') +def relay_unban(type, target): + 'Unban an instance or software' + + assert type in ['instance', 'software'] + + config = app['config'] + database = app['database'] + + unbancmd = getattr(config, f'unban_{type}') + + if unbancmd(target): + config.save() + + return click.echo(f'Unbanned {type}: {target}') + + return click.echo(f'{type.title()} is not banned: {target}') + + +@cli.command('allow') +@click.argument('instance') +def relay_allow(instance): + 'Add an instance to the whitelist' + + config = app['config'] + + if not config.add_whitelist(instance): + return click.echo(f'Instance already in the whitelist: {instance}') + + config.save() + click.echo(f'Instance added to the whitelist: {instance}') + + +@cli.command('deny') +@click.argument('instance') +def relay_deny(instance): + 'Remove an instance from the whitelist' + + config = app['config'] + database = app['database'] + inbox = database.get_inbox(instance) + + if not config.del_whitelist(instance): + return click.echo(f'Instance not in the whitelist: {instance}') + + config.save() + + if inbox and config.whitelist_enabled: + database.del_inbox(inbox) + database.save() + + click.echo(f'Removed instance from the whitelist: {instance}') + + +@cli.command('setup') +def relay_setup(): + 'Generate a new config' + + config = app['config'] + + while True: + config.host = click.prompt('What domain will the relay be hosted on?', default=config.host) + + if not config.host.endswith('example.com'): + break + + click.echo('The domain must not be example.com') + + config.listen = click.prompt('Which address should the relay listen on?', default=config.listen) + + while True: + config.port = click.prompt('What TCP port should the relay listen on?', default=config.port, type=int) + break + + config.save() + + if click.confirm('Relay all setup! Would you like to run it now?'): + relay_run.callback() + + +@cli.command('run') +def relay_run(): + 'Run the relay' + + if app['config'].host.endswith('example.com'): + return click.echo('Relay is not set up. Please edit your relay config or run "activityrelay setup".') + + vers_split = platform.python_version().split('.') + pip_command = 'pip3 uninstall pycrypto && pip3 install pycryptodome' + + if Crypto.__version__ == '2.6.1': + if int(vers_split[1]) > 7: + click.echo('Error: PyCrypto is broken on Python 3.8+. Please replace it with pycryptodome before running again. Exiting...') + return click.echo(pip_command) + + else: + click.echo('Warning: PyCrypto is old and should be replaced with pycryptodome') + return click.echo(pip_command) + + # web pages + app.router.add_get('/', views.home) + + # endpoints + app.router.add_post('/actor', views.inbox) + app.router.add_post('/inbox', views.inbox) + app.router.add_get('/actor', views.actor) + app.router.add_get('/nodeinfo/2.0.json', views.nodeinfo_2_0) + app.router.add_get('/.well-known/nodeinfo', views.nodeinfo_wellknown) + app.router.add_get('/.well-known/webfinger', views.webfinger) + + if logging.DEBUG >= logging.root.level: + app.router.add_get('/stats', views.stats) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + asyncio.ensure_future(handle_start_webserver(), loop=loop) + loop.run_forever() + + +async def handle_follow_actor(app, target): + database = app['database'] + config = app['config'] + + if not target.startswith('http'): + target = f'https://{target}/actor' + + if database.get_inbox(target): + return click.echo(f'Error: Already following actor: {target}') + + if config.is_banned(target): + return click.echo(f'Error: Refusing to follow banned actor: {target}') + + if config.whitelist_enabled and not config.is_whitelisted(target): + return click.echo(f'Error: Refusing to follow non-whitelisted actor: {target}') + + await misc.follow_remote_actor(target) + click.echo(f'Sent follow message to: {target}') + + +async def handle_unfollow_actor(app, target): + database = app['database'] + + if not target.startswith('http'): + target = f'https://{target}/actor' + + if not database.get_inbox(target): + return click.echo(f'Error: Not following actor: {target}') + + await misc.unfollow_remote_actor(target) + click.echo(f'Sent unfollow message to: {target}') + + +async def handle_start_webserver(): + config = app['config'] + runner = AppRunner(app, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{Referer}i" "%{User-Agent}i"') + + logging.info(f'Starting webserver at {config.host} ({config.listen}:{config.port})') + await runner.setup() + + site = TCPSite(runner, config.listen, config.port) + await site.start() def main(): - if len(sys.argv) < 2: - usage() - - if sys.argv[1] in TASKS: - TASKS[sys.argv[1]]() - else: - usage() - - -if __name__ == '__main__': - main() + cli(prog_name='relay') diff --git a/relay/misc.py b/relay/misc.py index e19bff1..0ca03e4 100644 --- a/relay/misc.py +++ b/relay/misc.py @@ -9,6 +9,7 @@ from Crypto.PublicKey import RSA from Crypto.Signature import PKCS1_v1_5 from aiohttp import ClientSession from datetime import datetime +from json.decoder import JSONDecodeError from urllib.parse import urlparse from uuid import uuid4 @@ -60,7 +61,7 @@ def distill_inboxes(actor, object_id): targets = [] for inbox in database.inboxes: - if inbox != actor_inbox or urlparse(inbox).hostname != original_hostname: + if inbox != actor_inbox or urlparse(inbox).hostname != origin_hostname: targets.append(inbox) return targets @@ -245,11 +246,15 @@ async def request(uri, data=None, force=False, sign_headers=True): headers.update(signing_headers) try: + # json_serializer=DotDict maybe? async with ClientSession(trace_configs=http_debug()) as session, get_app()['semaphore']: async with session.request(method, uri, headers=headers, data=data) as resp: - if resp.status not in [200, 202]: - resp_payload = await resp.text(encoding='utf-8') + ## aiohttp has been known to leak if the response hasn't been read, + ## so we're just gonna read the request no matter what + resp_data = await resp.read() + resp_payload = json.loads(resp_data.decode('utf-8')) + if resp.status not in [200, 202]: if not data: logging.verbose(f'Received error when requesting {uri}: {resp.status} {resp_payload}') return @@ -257,20 +262,16 @@ async def request(uri, data=None, force=False, sign_headers=True): logging.verbose(f'Received error when sending {action} to {uri}: {resp.status} {resp_payload}') return - try: - resp_payload = await resp.json(encoding='utf-8', content_type=None) - - except: - return - logging.debug(f'{uri} >> resp {resp_payload}') get_app()['cache'].json[uri] = resp_payload return resp_payload - except Exception as e: + except JSONDecodeError: + return + + except Exception: traceback.print_exc() - return None async def validate_signature(actor, http_request): diff --git a/relay/processors.py b/relay/processors.py index 4ce2684..7a7e3ad 100644 --- a/relay/processors.py +++ b/relay/processors.py @@ -13,10 +13,12 @@ async def handle_relay(actor, data, request): object_id = misc.distill_object_id(data) if object_id in cache: - logging.debug(f'>> already relayed {object_id} as {cache[object_id]}') + logging.verbose(f'already relayed {object_id} as {cache[object_id]}') return - activity_id = f"https://{request.host}/activities/{uuid.uuid4()}" + logging.verbose(f'Relaying post from {actor["id"]}') + + activity_id = f"https://{request.host}/activities/{uuid4()}" message = { "@context": "https://www.w3.org/ns/activitystreams", @@ -29,11 +31,10 @@ async def handle_relay(actor, data, request): logging.debug(f'>> relay: {message}') - inboxes = misc.distill_inboxes(actor['id'], object_id) - + inboxes = misc.distill_inboxes(actor, object_id) futures = [misc.request(inbox, data=message) for inbox in inboxes] - asyncio.ensure_future(asyncio.gather(*futures)) + asyncio.ensure_future(asyncio.gather(*futures)) cache[object_id] = activity_id @@ -42,9 +43,10 @@ async def handle_forward(actor, data, request): object_id = misc.distill_object_id(data) if object_id in cache: - logging.debug(f'>> already forwarded {object_id}.') + logging.verbose(f'already forwarded {object_id}') return + logging.verbose(f'Forwarding post from {actor["id"]}') logging.debug(f'>> Relay {data}') inboxes = misc.distill_inboxes(actor['id'], object_id) @@ -87,7 +89,12 @@ async def handle_follow(actor, data, request): async def handle_undo(actor, data, request): - if data['object']['type'] != 'Follow': + ## If the activity being undone is an Announce, forward it insteead + if data['object']['type'] == 'Announce': + await handle_forward(actor, data, request) + return + + elif data['object']['type'] != 'Follow': return database = app['database'] @@ -113,4 +120,5 @@ processors = { async def run_processor(request, data, actor): + logging.verbose(f'New activity from actor: {actor["id"]} {data["type"]}') return await processors[data['type']](actor, data, request) diff --git a/setup.cfg b/setup.cfg index 43c041c..592bf04 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = relay -version = 0.1.0 +version = 0.2.0 description = Generic LitePub relay (works with all LitePub consumers and Mastodon) long_description = file: README.md long_description_content_type = text/markdown; charset=UTF-8 @@ -23,17 +23,14 @@ project_urls = zip_safe = False packages = find: install_requires = - aiohttp>=3.5.4 - async-timeout>=3.0.0 - attrs>=18.1.0 - chardet>=3.0.4 - idna>=2.7 - idna-ssl>=1.1.0; python_version < "3.7" - multidict>=4.3.1 - pycryptodome>=3.9.4 - PyYAML>=5.1 - simplejson>=3.16.0 - yarl>=1.2.6 - cachetools - async_lru + aiohttp >= 3.8.0 + cachetools >= 5.0.0 + click >= 8.1.2 + pycryptodome >= 3.14.1 + PyYAML >= 5.0.0 python_requires = >=3.6 + +[options.entry_points] +console_scripts = + activityrelay = relay.manage:main +