diff --git a/docs/installation.md b/docs/installation.md index 2e93904..a852ab1 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -15,7 +15,7 @@ the [official pipx docs](https://pypa.github.io/pipx/installation/) for more in- Now simply install ActivityRelay directly from git - pipx install git+https://git.pleroma.social/pleroma/relay@0.2.0 + pipx install git+https://git.pleroma.social/pleroma/relay@0.2.3 Or from a cloned git repo. @@ -39,7 +39,7 @@ be installed via [pyenv](https://github.com/pyenv/pyenv). The instructions for installation via pip are very similar to pipx. Installation can be done from git - python3 -m pip install git+https://git.pleroma.social/pleroma/relay@0.2.0 + python3 -m pip install git+https://git.pleroma.social/pleroma/relay@0.2.3 or a cloned git repo. diff --git a/relay.spec b/relay.spec new file mode 100644 index 0000000..57fedc7 --- /dev/null +++ b/relay.spec @@ -0,0 +1,44 @@ +# -*- mode: python ; coding: utf-8 -*- + + +block_cipher = None + + +a = Analysis( + ['relay/__main__.py'], + pathex=[], + binaries=[], + datas=[], + hiddenimports=[], + hookspath=[], + hooksconfig={}, + runtime_hooks=[], + excludes=[], + win_no_prefer_redirects=False, + win_private_assemblies=False, + cipher=block_cipher, + noarchive=False, +) +pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher) + +exe = EXE( + pyz, + a.scripts, + a.binaries, + a.zipfiles, + a.datas, + [], + name='activityrelay', + debug=False, + bootloader_ignore_signals=False, + strip=False, + upx=True, + upx_exclude=[], + runtime_tmpdir=None, + console=True, + disable_windowed_traceback=False, + argv_emulation=False, + target_arch=None, + codesign_identity=None, + entitlements_file=None, +) diff --git a/relay.yaml.example b/relay.yaml.example index 26749e4..d123e08 100644 --- a/relay.yaml.example +++ b/relay.yaml.example @@ -10,7 +10,7 @@ port: 8080 note: "Make a note about your instance here." # maximum number of inbox posts to do at once -post_limit: 512 +push_limit: 512 # this section is for ActivityPub ap: @@ -39,5 +39,5 @@ ap: # cache limits as number of items. only change this if you know what you're doing cache: objects: 1024 - actors: 1024 + json: 1024 digests: 1024 diff --git a/relay/__init__.py b/relay/__init__.py index 9489f87..82aaa8a 100644 --- a/relay/__init__.py +++ b/relay/__init__.py @@ -1,8 +1,3 @@ -__version__ = '0.2.2' - -from aiohttp.web import Application +__version__ = '0.2.3' from . import logger - - -app = Application() diff --git a/relay/__main__.py b/relay/__main__.py index 3280f12..8ed335a 100644 --- a/relay/__main__.py +++ b/relay/__main__.py @@ -1,4 +1,4 @@ -from .manage import main +from relay.manage import main if __name__ == '__main__': diff --git a/relay/application.py b/relay/application.py new file mode 100644 index 0000000..cc2815b --- /dev/null +++ b/relay/application.py @@ -0,0 +1,127 @@ +import asyncio +import logging +import os +import signal + +from aiohttp import web +from cachetools import LRUCache +from datetime import datetime, timedelta + +from .config import RelayConfig +from .database import RelayDatabase +from .misc import DotDict, check_open_port, set_app +from .views import routes + + +class Application(web.Application): + def __init__(self, cfgpath): + web.Application.__init__(self) + + self['starttime'] = None + self['running'] = False + self['is_docker'] = bool(os.environ.get('DOCKER_RUNNING')) + self['config'] = RelayConfig(cfgpath, self['is_docker']) + + if not self['config'].load(): + self['config'].save() + + self['database'] = RelayDatabase(self['config']) + self['database'].load() + + self['cache'] = DotDict({key: Cache(maxsize=self['config'][key]) for key in self['config'].cachekeys}) + self['semaphore'] = asyncio.Semaphore(self['config'].push_limit) + + self.set_signal_handler() + set_app(self) + + + @property + def cache(self): + return self['cache'] + + + @property + def config(self): + return self['config'] + + + @property + def database(self): + return self['database'] + + + @property + def is_docker(self): + return self['is_docker'] + + + @property + def semaphore(self): + return self['semaphore'] + + + @property + def uptime(self): + if not self['starttime']: + return timedelta(seconds=0) + + uptime = datetime.now() - self['starttime'] + + return timedelta(seconds=uptime.seconds) + + + def set_signal_handler(self): + for sig in {'SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGTERM'}: + try: + signal.signal(getattr(signal, sig), self.stop) + + # some signals don't exist in windows, so skip them + except AttributeError: + pass + + + def run(self): + if not check_open_port(self.config.listen, self.config.port): + return logging.error(f'A server is already running on port {self.config.port}') + + for route in routes: + if route[1] == '/stats' and logging.DEBUG < logging.root.level: + continue + + self.router.add_route(*route) + + logging.info(f'Starting webserver at {self.config.host} ({self.config.listen}:{self.config.port})') + asyncio.run(self.handle_run()) + + + def stop(self, *_): + self['running'] = False + + + async def handle_run(self): + self['running'] = True + + runner = web.AppRunner(self, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{User-Agent}i"') + await runner.setup() + + site = web.TCPSite(runner, + host = self.config.listen, + port = self.config.port, + reuse_address = True + ) + + await site.start() + self['starttime'] = datetime.now() + + while self['running']: + await asyncio.sleep(0.25) + + await site.stop() + + self['starttime'] = None + self['running'] = False + + +class Cache(LRUCache): + def set_maxsize(self, value): + self.__maxsize = int(value) diff --git a/relay/config.py b/relay/config.py index 951b6f3..fd22f22 100644 --- a/relay/config.py +++ b/relay/config.py @@ -4,6 +4,8 @@ import yaml from pathlib import Path from urllib.parse import urlparse +from .misc import DotDict + relay_software_names = [ 'activityrelay', @@ -13,45 +15,6 @@ relay_software_names = [ ] -class DotDict(dict): - def __getattr__(self, k): - try: - return self[k] - - except KeyError: - raise AttributeError(f'{self.__class__.__name__} object has no attribute {k}') from None - - - def __setattr__(self, k, v): - try: - if k in self._ignore_keys: - super().__setattr__(k, v) - - except AttributeError: - pass - - if k.startswith('_'): - super().__setattr__(k, v) - - else: - self[k] = v - - - def __setitem__(self, k, v): - if type(v) == dict: - v = DotDict(v) - - super().__setitem__(k, v) - - - def __delattr__(self, k): - try: - dict.__delitem__(self, k) - - except KeyError: - raise AttributeError(f'{self.__class__.__name__} object has no attribute {k}') from None - - class RelayConfig(DotDict): apkeys = { 'host', @@ -69,27 +32,15 @@ class RelayConfig(DotDict): def __init__(self, path, is_docker): + DotDict.__init__(self, {}) + if is_docker: path = '/data/relay.yaml' self._isdocker = is_docker self._path = Path(path).expanduser() - super().__init__({ - 'db': str(self._path.parent.joinpath(f'{self._path.stem}.jsonld')), - 'listen': '0.0.0.0', - 'port': 8080, - 'note': 'Make a note about your instance here.', - 'push_limit': 512, - 'host': 'relay.example.com', - 'blocked_software': [], - 'blocked_instances': [], - 'whitelist': [], - 'whitelist_enabled': False, - 'json': 1024, - 'objects': 1024, - 'digests': 1024 - }) + self.reset() def __setitem__(self, key, value): @@ -133,6 +84,24 @@ class RelayConfig(DotDict): return f'{self.actor}#main-key' + def reset(self): + self.clear() + self.update({ + 'db': str(self._path.parent.joinpath(f'{self._path.stem}.jsonld')), + 'listen': '0.0.0.0', + 'port': 8080, + 'note': 'Make a note about your instance here.', + 'push_limit': 512, + 'host': 'relay.example.com', + 'blocked_software': [], + 'blocked_instances': [], + 'whitelist': [], + 'whitelist_enabled': False, + 'json': 1024, + 'objects': 1024, + 'digests': 1024 + }) + def ban_instance(self, instance): if instance.startswith('http'): instance = urlparse(instance).hostname @@ -218,6 +187,8 @@ class RelayConfig(DotDict): def load(self): + self.reset() + options = {} try: diff --git a/relay/database.py b/relay/database.py index 6e71f56..90b1473 100644 --- a/relay/database.py +++ b/relay/database.py @@ -6,10 +6,16 @@ from Crypto.PublicKey import RSA from urllib.parse import urlparse -class RelayDatabase: +class RelayDatabase(dict): def __init__(self, config): + dict.__init__(self, { + 'relay-list': {}, + 'private-key': None, + 'follow-requests': {}, + 'version': 1 + }) + self.config = config - self.data = None self.PRIVKEY = None @@ -25,26 +31,22 @@ class RelayDatabase: @property def privkey(self): - try: - return self.data['private-key'] - - except KeyError: - return False + return self['private-key'] @property def hostnames(self): - return [urlparse(inbox).hostname for inbox in self.inboxes] + return tuple(self['relay-list'].keys()) @property def inboxes(self): - return self.data.get('relay-list', []) + return tuple(data['inbox'] for data in self['relay-list'].values()) def generate_key(self): self.PRIVKEY = RSA.generate(4096) - self.data['private-key'] = self.PRIVKEY.exportKey('PEM').decode('utf-8') + self['private-key'] = self.PRIVKEY.exportKey('PEM').decode('utf-8') def load(self): @@ -52,14 +54,31 @@ class RelayDatabase: try: with self.config.db.open() as fd: - self.data = json.load(fd) + data = json.load(fd) - key = self.data.pop('actorKeys', None) + self['version'] = data.get('version', None) + self['private-key'] = data.get('private-key') - if key: - self.data['private-key'] = key.get('privateKey') + if self['version'] == None: + self['version'] = 1 + + if 'actorKeys' in data: + self['private-key'] = data['actorKeys']['privateKey'] + + for item in data.get('relay-list', []): + domain = urlparse(item).hostname + self['relay-list'][domain] = { + 'inbox': item, + 'followid': None + } + + else: + self['relay-list'] = data.get('relay-list', {}) + + for domain in self['relay-list'].keys(): + if self.config.is_banned(domain) or (self.config.whitelist_enabled and not self.config.is_whitelisted(domain)): + self.del_inbox(domain) - self.data.pop('actors', None) new_db = False except FileNotFoundError: @@ -69,14 +88,6 @@ class RelayDatabase: if self.config.db.stat().st_size > 0: raise e from None - if not self.data: - logging.info('No database was found. Making a new one.') - self.data = {} - - for inbox in self.inboxes: - if self.config.is_banned(inbox) or (self.config.whitelist_enabled and not self.config.is_whitelisted(inbox)): - self.del_inbox(inbox) - if not self.privkey: logging.info("No actor keys present, generating 4096-bit RSA keypair.") self.generate_key() @@ -90,34 +101,99 @@ class RelayDatabase: def save(self): with self.config.db.open('w') as fd: - data = { - 'relay-list': self.inboxes, - 'private-key': self.privkey - } - - json.dump(data, fd, indent=4) + json.dump(self, fd, indent=4) - def get_inbox(self, domain): + def get_inbox(self, domain, fail=False): if domain.startswith('http'): domain = urlparse(domain).hostname - for inbox in self.inboxes: - if domain == urlparse(inbox).hostname: - return inbox + if domain not in self['relay-list']: + if fail: + raise KeyError(domain) + + return + + return self['relay-list'][domain] - def add_inbox(self, inbox): - assert inbox.startswith('https') - assert not self.get_inbox(inbox) + def add_inbox(self, inbox, followid=None, fail=False): + assert inbox.startswith('https'), 'Inbox must be a url' + domain = urlparse(inbox).hostname - self.data['relay-list'].append(inbox) + if self.get_inbox(domain): + if fail: + raise KeyError(domain) + + return False + + self['relay-list'][domain] = { + 'domain': domain, + 'inbox': inbox, + 'followid': followid + } + + logging.verbose(f'Added inbox to database: {inbox}') + return self['relay-list'][domain] - def del_inbox(self, inbox_url): - inbox = self.get_inbox(inbox_url) + def del_inbox(self, domain, followid=None, fail=False): + data = self.get_inbox(domain, fail=False) - if not inbox: - raise KeyError(inbox_url) + if not data: + if fail: + raise KeyError(domain) - self.data['relay-list'].remove(inbox) + return False + + if not data['followid'] or not followid or data['followid'] == followid: + del self['relay-list'][data['domain']] + logging.verbose(f'Removed inbox from database: {data["inbox"]}') + return True + + if fail: + raise ValueError('Follow IDs do not match') + + logging.debug(f'Follow ID does not match: db = {data["followid"]}, object = {followid}') + return False + + + def set_followid(self, domain, followid): + data = self.get_inbox(domain, fail=True) + data['followid'] = followid + + + def get_request(self, domain, fail=True): + if domain.startswith('http'): + domain = urlparse(domain).hostname + + try: + return self['follow-requests'][domain] + + except KeyError as e: + if fail: + raise e + + + def add_request(self, actor, inbox, followid): + domain = urlparse(inbox).hostname + + try: + request = self.get_request(domain) + request['followid'] = followid + + except KeyError: + pass + + self['follow-requests'][domain] = { + 'actor': actor, + 'inbox': inbox, + 'followid': followid + } + + + def del_request(self, domain): + if domain.startswith('http'): + domain = urlparse(inbox).hostname + + del self['follow-requests'][domain] diff --git a/relay/manage.py b/relay/manage.py index 4aae7c7..2031cb8 100644 --- a/relay/manage.py +++ b/relay/manage.py @@ -1,17 +1,17 @@ import Crypto import asyncio import click -import json import logging -import os import platform -from aiohttp.web import AppRunner, TCPSite -from cachetools import LRUCache +from urllib.parse import urlparse -from . import app, misc, views, __version__ -from .config import DotDict, RelayConfig, relay_software_names -from .database import RelayDatabase +from . import misc, __version__ +from .application import Application +from .config import relay_software_names + + +app = None @click.group('cli', context_settings={'show_default': True}, invoke_without_command=True) @@ -19,23 +19,11 @@ from .database import RelayDatabase @click.version_option(version=__version__, prog_name='ActivityRelay') @click.pass_context def cli(ctx, config): - app['is_docker'] = bool(os.environ.get('DOCKER_RUNNING')) - app['config'] = RelayConfig(config, app['is_docker']) - - if not app['config'].load(): - app['config'].save() - - app['database'] = RelayDatabase(app['config']) - app['database'].load() - - app['cache'] = DotDict() - app['semaphore'] = asyncio.Semaphore(app['config']['push_limit']) - - for key in app['config'].cachekeys: - app['cache'][key] = LRUCache(app['config'][key]) + global app + app = Application(config) if not ctx.invoked_subcommand: - if app['config'].host.endswith('example.com'): + if app.config.host.endswith('example.com'): relay_setup.callback() else: @@ -55,7 +43,7 @@ def cli_inbox_list(): click.echo('Connected to the following instances or relays:') - for inbox in app['database'].inboxes: + for inbox in app.database.inboxes: click.echo(f'- {inbox}') @@ -64,29 +52,30 @@ def cli_inbox_list(): def cli_inbox_follow(actor): 'Follow an actor (Relay must be running)' - config = app['config'] - database = app['database'] - - if config.is_banned(actor): + if app.config.is_banned(actor): return click.echo(f'Error: Refusing to follow banned actor: {actor}') if not actor.startswith('http'): + domain = actor actor = f'https://{actor}/actor' - if database.get_inbox(actor): - return click.echo(f'Error: Already following actor: {actor}') + else: + domain = urlparse(actor).hostname - actor_data = run_in_loop(misc.request, actor, sign_headers=True) + try: + inbox_data = app.database['relay-list'][domain] + inbox = inbox_data['inbox'] - if not actor_data: - return click.echo(f'Error: Failed to fetch actor: {actor}') + except KeyError: + actor_data = asyncio.run(misc.request(actor)) + inbox = actor_data.shared_inbox - inbox = misc.get_actor_inbox(actor_data) + message = misc.Message.new_follow( + host = app.config.host, + actor = actor.id + ) - database.add_inbox(inbox) - database.save() - - run_in_loop(misc.follow_remote_actor, actor) + asyncio.run(misc.request(inbox, message)) click.echo(f'Sent follow message to actor: {actor}') @@ -95,18 +84,36 @@ def cli_inbox_follow(actor): def cli_inbox_unfollow(actor): 'Unfollow an actor (Relay must be running)' - database = app['database'] - if not actor.startswith('http'): + domain = actor actor = f'https://{actor}/actor' - if not database.get_inbox(actor): - return click.echo(f'Error: Not following actor: {actor}') + else: + domain = urlparse(actor).hostname - database.del_inbox(actor) - database.save() + try: + inbox_data = app.database['relay-list'][domain] + inbox = inbox_data['inbox'] + message = misc.Message.new_unfollow( + host = app.config.host, + actor = actor, + follow = inbox_data['followid'] + ) - run_in_loop(misc.unfollow_remote_actor, actor) + except KeyError: + actor_data = asyncio.run(misc.request(actor)) + inbox = actor_data.shared_inbox + message = misc.Message.new_unfollow( + host = app.config.host, + actor = actor, + follow = { + 'type': 'Follow', + 'object': actor, + 'actor': f'https://{app.config.host}/actor' + } + ) + + asyncio.run(misc.request(inbox, message)) click.echo(f'Sent unfollow message to: {actor}') @@ -115,23 +122,17 @@ def cli_inbox_unfollow(actor): def cli_inbox_add(inbox): 'Add an inbox to the database' - database = app['database'] - config = app['config'] - if not inbox.startswith('http'): inbox = f'https://{inbox}/inbox' - if database.get_inbox(inbox): - click.echo(f'Error: Inbox already in database: {inbox}') - return + if app.config.is_banned(inbox): + return click.echo(f'Error: Refusing to add banned inbox: {inbox}') - if config.is_banned(inbox): - click.echo(f'Error: Refusing to add banned inbox: {inbox}') - return + if app.database.add_inbox(inbox): + app.database.save() + return click.echo(f'Added inbox to the database: {inbox}') - database.add_inbox(inbox) - database.save() - click.echo(f'Added inbox to the database: {inbox}') + click.echo(f'Error: Inbox already in database: {inbox}') @cli_inbox.command('remove') @@ -139,15 +140,16 @@ def cli_inbox_add(inbox): def cli_inbox_remove(inbox): 'Remove an inbox from the database' - database = app['database'] - dbinbox = database.get_inbox(inbox) + try: + dbinbox = app.database.get_inbox(inbox, fail=True) - if not dbinbox: + except KeyError: click.echo(f'Error: Inbox does not exist: {inbox}') return - database.del_inbox(dbinbox) - database.save() + app.database.del_inbox(dbinbox['domain']) + app.database.save() + click.echo(f'Removed inbox from the database: {inbox}') @@ -163,7 +165,7 @@ def cli_instance_list(): click.echo('Banned instances or relays:') - for domain in app['config'].blocked_instances: + for domain in app.config.blocked_instances: click.echo(f'- {domain}') @@ -172,16 +174,14 @@ def cli_instance_list(): def cli_instance_ban(target): 'Ban an instance and remove the associated inbox if it exists' - config = app['config'] - database = app['database'] - inbox = database.get_inbox(target) + if target.startswith('http'): + target = urlparse(target).hostname - if config.ban_instance(target): - config.save() + if app.config.ban_instance(target): + app.config.save() - if inbox: - database.del_inbox(inbox) - database.save() + if app.database.del_inbox(target): + app.database.save() click.echo(f'Banned instance: {target}') return @@ -194,10 +194,8 @@ def cli_instance_ban(target): def cli_instance_unban(target): 'Unban an instance' - config = app['config'] - - if config.unban_instance(target): - config.save() + if app.config.unban_instance(target): + app.config.save() click.echo(f'Unbanned instance: {target}') return @@ -217,7 +215,7 @@ def cli_software_list(): click.echo('Banned software:') - for software in app['config'].blocked_software: + for software in app.config.blocked_software: click.echo(f'- {software}') @@ -229,17 +227,15 @@ def cli_software_list(): def cli_software_ban(name, fetch_nodeinfo): 'Ban software. Use RELAYS for NAME to ban relays' - config = app['config'] - if name == 'RELAYS': for name in relay_software_names: - config.ban_software(name) + app.config.ban_software(name) - config.save() + app.config.save() return click.echo('Banned all relay software') if fetch_nodeinfo: - software = run_in_loop(fetch_nodeinfo, name) + software = asyncio.run(misc.fetch_nodeinfo(name)) if not software: click.echo(f'Failed to fetch software name from domain: {name}') @@ -247,7 +243,7 @@ def cli_software_ban(name, fetch_nodeinfo): name = software if config.ban_software(name): - config.save() + app.config.save() return click.echo(f'Banned software: {name}') click.echo(f'Software already banned: {name}') @@ -261,25 +257,23 @@ def cli_software_ban(name, fetch_nodeinfo): def cli_software_unban(name, fetch_nodeinfo): 'Ban software. Use RELAYS for NAME to unban relays' - config = app['config'] - if name == 'RELAYS': for name in relay_software_names: - config.unban_software(name) + app.config.unban_software(name) config.save() return click.echo('Unbanned all relay software') if fetch_nodeinfo: - software = run_in_loop(fetch_nodeinfo, name) + software = asyncio.run(misc.fetch_nodeinfo(name)) if not software: click.echo(f'Failed to fetch software name from domain: {name}') name = software - if config.unban_software(name): - config.save() + if app.config.unban_software(name): + app.config.save() return click.echo(f'Unbanned software: {name}') click.echo(f'Software wasn\'t banned: {name}') @@ -296,7 +290,7 @@ def cli_whitelist(): def cli_whitelist_list(): click.echo('Current whitelisted domains') - for domain in app['config'].whitelist: + for domain in app.config.whitelist: click.echo(f'- {domain}') @@ -305,12 +299,10 @@ def cli_whitelist_list(): def cli_whitelist_add(instance): 'Add an instance to the whitelist' - config = app['config'] - - if not config.add_whitelist(instance): + if not app.config.add_whitelist(instance): return click.echo(f'Instance already in the whitelist: {instance}') - config.save() + app.config.save() click.echo(f'Instance added to the whitelist: {instance}') @@ -319,18 +311,14 @@ def cli_whitelist_add(instance): def cli_whitelist_remove(instance): 'Remove an instance from the whitelist' - config = app['config'] - database = app['database'] - inbox = database.get_inbox(instance) - - if not config.del_whitelist(instance): + if not app.config.del_whitelist(instance): return click.echo(f'Instance not in the whitelist: {instance}') - config.save() + app.config.save() - if inbox and config.whitelist_enabled: - database.del_inbox(inbox) - database.save() + if app.config.whitelist_enabled: + if app.database.del_inbox(inbox): + app.database.save() click.echo(f'Removed instance from the whitelist: {instance}') @@ -339,23 +327,21 @@ def cli_whitelist_remove(instance): def relay_setup(): 'Generate a new config' - config = app['config'] - while True: - config.host = click.prompt('What domain will the relay be hosted on?', default=config.host) + app.config.host = click.prompt('What domain will the relay be hosted on?', default=app.config.host) if not config.host.endswith('example.com'): break click.echo('The domain must not be example.com') - config.listen = click.prompt('Which address should the relay listen on?', default=config.listen) + app.config.listen = click.prompt('Which address should the relay listen on?', default=app.config.listen) while True: - config.port = click.prompt('What TCP port should the relay listen on?', default=config.port, type=int) + app.config.port = click.prompt('What TCP port should the relay listen on?', default=app.config.port, type=int) break - config.save() + app.config.save() if not app['is_docker'] and click.confirm('Relay all setup! Would you like to run it now?'): relay_run.callback() @@ -365,9 +351,7 @@ def relay_setup(): def relay_run(): 'Run the relay' - config = app['config'] - - if config.host.endswith('example.com'): + if app.config.host.endswith('example.com'): return click.echo('Relay is not set up. Please edit your relay config or run "activityrelay setup".') vers_split = platform.python_version().split('.') @@ -382,43 +366,10 @@ def relay_run(): click.echo('Warning: PyCrypto is old and should be replaced with pycryptodome') return click.echo(pip_command) - if not misc.check_open_port(config.listen, config.port): - return click.echo(f'Error: A server is already running on port {config.port}') + if not misc.check_open_port(app.config.listen, app.config.port): + return click.echo(f'Error: A server is already running on port {app.config.port}') - # web pages - app.router.add_get('/', views.home) - - # endpoints - app.router.add_post('/actor', views.inbox) - app.router.add_post('/inbox', views.inbox) - app.router.add_get('/actor', views.actor) - app.router.add_get('/nodeinfo/2.0.json', views.nodeinfo_2_0) - app.router.add_get('/.well-known/nodeinfo', views.nodeinfo_wellknown) - app.router.add_get('/.well-known/webfinger', views.webfinger) - - if logging.DEBUG >= logging.root.level: - app.router.add_get('/stats', views.stats) - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - asyncio.ensure_future(handle_start_webserver(), loop=loop) - loop.run_forever() - - -def run_in_loop(func, *args, **kwargs): - loop = asyncio.new_event_loop() - return loop.run_until_complete(func(*args, **kwargs)) - - -async def handle_start_webserver(): - config = app['config'] - runner = AppRunner(app, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{Referer}i" "%{User-Agent}i"') - - logging.info(f'Starting webserver at {config.host} ({config.listen}:{config.port})') - await runner.setup() - - site = TCPSite(runner, config.listen, config.port) - await site.start() + app.run() def main(): diff --git a/relay/misc.py b/relay/misc.py index 50a575b..e5f362e 100644 --- a/relay/misc.py +++ b/relay/misc.py @@ -4,26 +4,47 @@ import json import logging import socket import traceback +import uuid from Crypto.Hash import SHA, SHA256, SHA512 from Crypto.PublicKey import RSA from Crypto.Signature import PKCS1_v1_5 from aiohttp import ClientSession +from aiohttp.hdrs import METH_ALL as METHODS +from aiohttp.web import Response as AiohttpResponse, View as AiohttpView from datetime import datetime from json.decoder import JSONDecodeError from urllib.parse import urlparse from uuid import uuid4 -from . import app from .http_debug import http_debug +app = None + HASHES = { 'sha1': SHA, 'sha256': SHA256, 'sha512': SHA512 } +MIMETYPES = { + 'activity': 'application/activity+json', + 'html': 'text/html', + 'json': 'application/json', + 'text': 'text/plain' +} + +NODEINFO_NS = { + '20': 'http://nodeinfo.diaspora.software/ns/schema/2.0', + '21': 'http://nodeinfo.diaspora.software/ns/schema/2.1' +} + + +def set_app(new_app): + global app + app = new_app + def build_signing_string(headers, used_headers): return '\n'.join(map(lambda x: ': '.join([x.lower(), headers[x]]), used_headers)) @@ -47,56 +68,35 @@ def create_signature_header(headers): sigstring = build_signing_string(headers, used_headers) sig = { - 'keyId': app['config'].keyid, + 'keyId': app.config.keyid, 'algorithm': 'rsa-sha256', 'headers': ' '.join(used_headers), - 'signature': sign_signing_string(sigstring, app['database'].PRIVKEY) + 'signature': sign_signing_string(sigstring, app.database.PRIVKEY) } chunks = ['{}="{}"'.format(k, v) for k, v in sig.items()] return ','.join(chunks) -def distill_object_id(activity): - logging.debug(f'>> determining object ID for {activity["object"]}') - - try: - return activity['object']['id'] - - except TypeError: - return activity['object'] - - def distill_inboxes(actor, object_id): - database = app['database'] - origin_hostname = urlparse(object_id).hostname - actor_inbox = get_actor_inbox(actor) - targets = [] - - for inbox in database.inboxes: - if inbox != actor_inbox or urlparse(inbox).hostname != origin_hostname: - targets.append(inbox) - - return targets + for inbox in app.database.inboxes: + if inbox != actor.shared_inbox and urlparse(inbox).hostname != urlparse(object_id).hostname: + yield inbox def generate_body_digest(body): - bodyhash = app['cache'].digests.get(body) + bodyhash = app.cache.digests.get(body) if bodyhash: return bodyhash h = SHA256.new(body.encode('utf-8')) bodyhash = base64.b64encode(h.digest()).decode('utf-8') - app['cache'].digests[body] = bodyhash + app.cache.digests[body] = bodyhash return bodyhash -def get_actor_inbox(actor): - return actor.get('endpoints', {}).get('sharedInbox', actor['inbox']) - - def sign_signing_string(sigstring, key): pkcs = PKCS1_v1_5.new(key) h = SHA256.new() @@ -106,20 +106,6 @@ def sign_signing_string(sigstring, key): return base64.b64encode(sigdata).decode('utf-8') -def split_signature(sig): - default = {"headers": "date"} - - sig = sig.strip().split(',') - - for chunk in sig: - k, _, v = chunk.partition('=') - v = v.strip('\"') - default[k] = v - - default['headers'] = default['headers'].split() - return default - - async def fetch_actor_key(actor): actor_data = await request(actor) @@ -135,103 +121,52 @@ async def fetch_actor_key(actor): async def fetch_nodeinfo(domain): nodeinfo_url = None - wk_nodeinfo = await request(f'https://{domain}/.well-known/nodeinfo', sign_headers=False, activity=False) if not wk_nodeinfo: return - for link in wk_nodeinfo.get('links', ''): - if link['rel'] == 'http://nodeinfo.diaspora.software/ns/schema/2.0': - nodeinfo_url = link['href'] - break + wk_nodeinfo = WKNodeinfo(wk_nodeinfo) + + for version in ['20', '21']: + try: + nodeinfo_url = wk_nodeinfo.get_url(version) + + except KeyError: + pass if not nodeinfo_url: - return + logging.verbose(f'Failed to fetch nodeinfo url for domain: {domain}') + return False - nodeinfo_data = await request(nodeinfo_url, sign_headers=False, activity=False) + nodeinfo = await request(nodeinfo_url, sign_headers=False, activity=False) try: - return nodeinfo_data['software']['name'] + return nodeinfo['software']['name'] except KeyError: return False -async def follow_remote_actor(actor_uri): - config = app['config'] - - actor = await request(actor_uri) - inbox = get_actor_inbox(actor) - - if not actor: - logging.error(f'failed to fetch actor at: {actor_uri}') - return - - logging.verbose(f'sending follow request: {actor_uri}') - - message = { - "@context": "https://www.w3.org/ns/activitystreams", - "type": "Follow", - "to": [actor['id']], - "object": actor['id'], - "id": f"https://{config.host}/activities/{uuid4()}", - "actor": f"https://{config.host}/actor" - } - - await request(inbox, message) - - -async def unfollow_remote_actor(actor_uri): - config = app['config'] - - actor = await request(actor_uri) - - if not actor: - logging.error(f'failed to fetch actor: {actor_uri}') - return - - inbox = get_actor_inbox(actor) - logging.verbose(f'sending unfollow request to inbox: {inbox}') - - message = { - "@context": "https://www.w3.org/ns/activitystreams", - "type": "Undo", - "to": [actor_uri], - "object": { - "type": "Follow", - "object": actor_uri, - "actor": actor_uri, - "id": f"https://{config.host}/activities/{uuid4()}" - }, - "id": f"https://{config.host}/activities/{uuid4()}", - "actor": f"https://{config.host}/actor" - } - - await request(inbox, message) - - async def request(uri, data=None, force=False, sign_headers=True, activity=True): ## If a get request and not force, try to use the cache first if not data and not force: try: - return app['cache'].json[uri] + return app.cache.json[uri] except KeyError: pass url = urlparse(uri) method = 'POST' if data else 'GET' - headers = {'User-Agent': 'ActivityRelay'} - mimetype = 'application/activity+json' if activity else 'application/json' + action = data.get('type') if data else None + headers = { + 'Accept': f'{MIMETYPES["activity"]}, {MIMETYPES["json"]};q=0.9', + 'User-Agent': 'ActivityRelay', + } - ## Set the content type for a POST - if data and 'Content-Type' not in headers: - headers['Content-Type'] = mimetype - - ## Set the accepted content type for a GET - elif not data and 'Accept' not in headers: - headers['Accept'] = mimetype + if data: + headers['Content-Type'] = MIMETYPES['activity' if activity else 'json'] if sign_headers: signing_headers = { @@ -243,7 +178,6 @@ async def request(uri, data=None, force=False, sign_headers=True, activity=True) if data: assert isinstance(data, dict) - action = data.get('type') data = json.dumps(data) signing_headers.update({ 'Digest': f'SHA-256={generate_body_digest(data)}', @@ -258,26 +192,42 @@ async def request(uri, data=None, force=False, sign_headers=True, activity=True) headers.update(signing_headers) try: - # json_serializer=DotDict maybe? - async with ClientSession(trace_configs=http_debug()) as session, app['semaphore']: + if data: + logging.verbose(f'Sending "{action}" to inbox: {uri}') + + else: + logging.verbose(f'Sending GET request to url: {uri}') + + async with ClientSession(trace_configs=http_debug()) as session, app.semaphore: async with session.request(method, uri, headers=headers, data=data) as resp: ## aiohttp has been known to leak if the response hasn't been read, ## so we're just gonna read the request no matter what resp_data = await resp.read() - resp_payload = json.loads(resp_data.decode('utf-8')) - if resp.status not in [200, 202]: - if not data: - logging.verbose(f'Received error when requesting {uri}: {resp.status} {resp_payload}') - return - - logging.verbose(f'Received error when sending {action} to {uri}: {resp.status} {resp_payload}') + ## Not expecting a response, so just return + if resp.status == 202: return - logging.debug(f'{uri} >> resp {resp_payload}') + elif resp.status != 200: + if not resp_data: + return logging.verbose(f'Received error when requesting {uri}: {resp.status} {resp_data}') - app['cache'].json[uri] = resp_payload - return resp_payload + return logging.verbose(f'Received error when sending {action} to {uri}: {resp.status} {resp_data}') + + if resp.content_type == MIMETYPES['activity']: + resp_data = await resp.json(loads=Message.new_from_json) + + elif resp.content_type == MIMETYPES['json']: + resp_data = await resp.json(loads=DotDict.new_from_json) + + else: + logging.verbose(f'Invalid Content-Type for "{url}": {resp.content_type}') + return logging.debug(f'Response: {resp_data}') + + logging.debug(f'{uri} >> resp {resp_data}') + + app.cache.json[uri] = resp_data + return resp_data except JSONDecodeError: return @@ -286,29 +236,19 @@ async def request(uri, data=None, force=False, sign_headers=True, activity=True) traceback.print_exc() -async def validate_signature(actor, http_request): - pubkey = await fetch_actor_key(actor) - - if not pubkey: - return False - - logging.debug(f'actor key: {pubkey}') - +async def validate_signature(actor, signature, http_request): headers = {key.lower(): value for key, value in http_request.headers.items()} headers['(request-target)'] = ' '.join([http_request.method.lower(), http_request.path]) - sig = split_signature(headers['signature']) - logging.debug(f'sigdata: {sig}') - - sigstring = build_signing_string(headers, sig['headers']) + sigstring = build_signing_string(headers, signature['headers']) logging.debug(f'sigstring: {sigstring}') - sign_alg, _, hash_alg = sig['algorithm'].partition('-') + sign_alg, _, hash_alg = signature['algorithm'].partition('-') logging.debug(f'sign alg: {sign_alg}, hash alg: {hash_alg}') - sigdata = base64.b64decode(sig['signature']) + sigdata = base64.b64decode(signature['signature']) - pkcs = PKCS1_v1_5.new(pubkey) + pkcs = PKCS1_v1_5.new(actor.PUBKEY) h = HASHES[hash_alg].new() h.update(sigstring.encode('ascii')) result = pkcs.verify(h, sigdata) @@ -317,3 +257,294 @@ async def validate_signature(actor, http_request): logging.debug(f'validates? {result}') return result + + +class DotDict(dict): + def __init__(self, _data, **kwargs): + dict.__init__(self) + + self.update(_data, **kwargs) + + + def __getattr__(self, k): + try: + return self[k] + + except KeyError: + raise AttributeError(f'{self.__class__.__name__} object has no attribute {k}') from None + + + def __setattr__(self, k, v): + if k.startswith('_'): + super().__setattr__(k, v) + + else: + self[k] = v + + + def __setitem__(self, k, v): + if type(v) == dict: + v = DotDict(v) + + super().__setitem__(k, v) + + + def __delattr__(self, k): + try: + dict.__delitem__(self, k) + + except KeyError: + raise AttributeError(f'{self.__class__.__name__} object has no attribute {k}') from None + + + @classmethod + def new_from_json(cls, data): + if not data: + raise JSONDecodeError('Empty body', data, 1) + + try: + return cls(json.loads(data)) + + except ValueError: + raise JSONDecodeError('Invalid body', data, 1) + + + @classmethod + def new_from_signature(cls, sig): + data = cls({}) + + for chunk in sig.strip().split(','): + key, value = chunk.split('=', 1) + value = value.strip('\"') + + if key == 'headers': + value = value.split() + + data[key.lower()] = value + + return data + + + def to_json(self, indent=None): + return json.dumps(self, indent=indent) + + + def update(self, _data, **kwargs): + if isinstance(_data, dict): + for key, value in _data.items(): + self[key] = value + + elif isinstance(_data, (list, tuple, set)): + for key, value in _data: + self[key] = value + + for key, value in kwargs.items(): + self[key] = value + + +class Message(DotDict): + @classmethod + def new_actor(cls, host, pubkey, description=None): + return cls({ + '@context': 'https://www.w3.org/ns/activitystreams', + 'id': f'https://{host}/actor', + 'type': 'Application', + 'preferredUsername': 'relay', + 'name': 'ActivityRelay', + 'summary': description or 'ActivityRelay bot', + 'followers': f'https://{host}/followers', + 'following': f'https://{host}/following', + 'inbox': f'https://{host}/inbox', + 'url': f'https://{host}/inbox', + 'endpoints': { + 'sharedInbox': f'https://{host}/inbox' + }, + 'publicKey': { + 'id': f'https://{host}/actor#main-key', + 'owner': f'https://{host}/actor', + 'publicKeyPem': pubkey + } + }) + + + @classmethod + def new_announce(cls, host, object): + return cls({ + '@context': 'https://www.w3.org/ns/activitystreams', + 'id': f'https://{host}/activities/{uuid.uuid4()}', + 'type': 'Announce', + 'to': [f'https://{host}/followers'], + 'actor': f'https://{host}/actor', + 'object': object + }) + + + @classmethod + def new_follow(cls, host, actor): + return cls({ + '@context': 'https://www.w3.org/ns/activitystreams', + 'type': 'Follow', + 'to': [actor], + 'object': actor, + 'id': f'https://{host}/activities/{uuid.uuid4()}', + 'actor': f'https://{host}/actor' + }) + + + @classmethod + def new_unfollow(cls, host, actor, follow): + return cls({ + '@context': 'https://www.w3.org/ns/activitystreams', + 'id': f'https://{host}/activities/{uuid.uuid4()}', + 'type': 'Undo', + 'to': [actor], + 'actor': f'https://{host}/actor', + 'object': follow + }) + + + @classmethod + def new_response(cls, host, actor, followid, accept): + return cls({ + '@context': 'https://www.w3.org/ns/activitystreams', + 'id': f'https://{host}/activities/{uuid.uuid4()}', + 'type': 'Accept' if accept else 'Reject', + 'to': [actor], + 'actor': f'https://{host}/actor', + 'object': { + 'id': followid, + 'type': 'Follow', + 'object': f'https://{host}/actor', + 'actor': actor + } + }) + + + # misc properties + @property + def domain(self): + return urlparse(self.id).hostname + + + # actor properties + @property + def PUBKEY(self): + return RSA.import_key(self.pubkey) + + + @property + def pubkey(self): + return self.publicKey.publicKeyPem + + + @property + def shared_inbox(self): + return self.get('endpoints', {}).get('sharedInbox', self.inbox) + + + # activity properties + @property + def actorid(self): + if isinstance(self.actor, dict): + return self.actor.id + + return self.actor + + + @property + def objectid(self): + if isinstance(self.object, dict): + return self.object.id + + return self.object + + +class Response(AiohttpResponse): + @classmethod + def new(cls, body='', status=200, headers=None, ctype='text'): + kwargs = { + 'status': status, + 'headers': headers, + 'content_type': MIMETYPES[ctype] + } + + if isinstance(body, bytes): + kwargs['body'] = body + + elif isinstance(body, dict) and ctype in {'json', 'activity'}: + kwargs['text'] = json.dumps(body) + + else: + kwargs['text'] = body + + return cls(**kwargs) + + + @classmethod + def new_error(cls, status, body, ctype='text'): + if ctype == 'json': + body = json.dumps({'status': status, 'error': body}) + + return cls.new(body=body, status=status, ctype=ctype) + + + @property + def location(self): + return self.headers.get('Location') + + + @location.setter + def location(self, value): + self.headers['Location'] = value + + +class View(AiohttpView): + async def _iter(self): + if self.request.method not in METHODS: + self._raise_allowed_methods() + + method = getattr(self, self.request.method.lower(), None) + + if method is None: + self._raise_allowed_methods() + + return await method(**self.request.match_info) + + + @property + def app(self): + return self._request.app + + + @property + def cache(self): + return self.app.cache + + + @property + def config(self): + return self.app.config + + + @property + def database(self): + return self.app.database + + +class WKNodeinfo(DotDict): + @classmethod + def new(cls, v20, v21): + return cls({ + 'links': [ + {'rel': NODEINFO_NS['20'], 'href': v20}, + {'rel': NODEINFO_NS['21'], 'href': v21} + ] + }) + + + def get_url(self, version='20'): + for item in self.links: + if item['rel'] == NODEINFO_NS[version]: + return item['href'] + + raise KeyError(version) diff --git a/relay/processors.py b/relay/processors.py index 2f8b351..69f8b59 100644 --- a/relay/processors.py +++ b/relay/processors.py @@ -3,108 +3,95 @@ import logging from uuid import uuid4 -from . import app, misc +from . import misc -async def handle_relay(actor, data, request): - cache = app['cache'].objects - object_id = misc.distill_object_id(data) - - if object_id in cache: - logging.verbose(f'already relayed {object_id} as {cache[object_id]}') +async def handle_relay(request, actor, data, software): + if data.objectid in request.app.cache.objects: + logging.verbose(f'already relayed {data.objectid}') return - logging.verbose(f'Relaying post from {actor["id"]}') + logging.verbose(f'Relaying post from {data.actorid}') - activity_id = f"https://{request.host}/activities/{uuid4()}" - - message = { - "@context": "https://www.w3.org/ns/activitystreams", - "type": "Announce", - "to": [f"https://{request.host}/followers"], - "actor": f"https://{request.host}/actor", - "object": object_id, - "id": activity_id - } + message = misc.Message.new_announce( + host = request.app.config.host, + object = data.objectid + ) logging.debug(f'>> relay: {message}') - inboxes = misc.distill_inboxes(actor, object_id) + inboxes = misc.distill_inboxes(actor, data.objectid) futures = [misc.request(inbox, data=message) for inbox in inboxes] asyncio.ensure_future(asyncio.gather(*futures)) - cache[object_id] = activity_id + request.app.cache.objects[data.objectid] = message.id -async def handle_forward(actor, data, request): - cache = app['cache'].objects - object_id = misc.distill_object_id(data) - - if object_id in cache: - logging.verbose(f'already forwarded {object_id}') +async def handle_forward(request, actor, data, software): + if data.id in request.app.cache.objects: + logging.verbose(f'already forwarded {data.id}') return - logging.verbose(f'Forwarding post from {actor["id"]}') + message = misc.Message.new_announce( + host = request.app.config.host, + object = data + ) + + logging.verbose(f'Forwarding post from {actor.id}') logging.debug(f'>> Relay {data}') - inboxes = misc.distill_inboxes(actor, object_id) + inboxes = misc.distill_inboxes(actor, data.id) + futures = [misc.request(inbox, data=message) for inbox in inboxes] - futures = [misc.request(inbox, data=data) for inbox in inboxes] asyncio.ensure_future(asyncio.gather(*futures)) - - cache[object_id] = object_id + request.app.cache.objects[data.id] = message.id -async def handle_follow(actor, data, request): - config = app['config'] - database = app['database'] +async def handle_follow(request, actor, data, software): + if not request.app.database.add_inbox(actor.shared_inbox, data.id): + request.app.database.set_followid(actor.id, data.id) - inbox = misc.get_actor_inbox(actor) + request.app.database.save() - if inbox not in database.inboxes: - database.add_inbox(inbox) - database.save() - asyncio.ensure_future(misc.follow_remote_actor(actor['id'])) + await misc.request( + actor.shared_inbox, + misc.Message.new_response( + host = request.app.config.host, + actor = actor.id, + followid = data.id, + accept = True + ) + ) - message = { - "@context": "https://www.w3.org/ns/activitystreams", - "type": "Accept", - "to": [actor["id"]], - "actor": config.actor, - - # this is wrong per litepub, but mastodon < 2.4 is not compliant with that profile. - "object": { - "type": "Follow", - "id": data["id"], - "object": config.actor, - "actor": actor["id"] - }, - - "id": f"https://{request.host}/activities/{uuid4()}", - } - - asyncio.ensure_future(misc.request(inbox, message)) + # Are Akkoma and Pleroma the only two that expect a follow back? + # Ignoring only Mastodon for now + if software != 'mastodon': + await misc.request( + actor.shared_inbox, + misc.Message.new_follow( + host = request.app.config.host, + actor = actor.id + ) + ) -async def handle_undo(actor, data, request): - ## If the activity being undone is an Announce, forward it insteead - if data['object']['type'] == 'Announce': - await handle_forward(actor, data, request) +async def handle_undo(request, actor, data, software): + ## If the object is not a Follow, forward it + if data['object']['type'] != 'Follow': + return await handle_forward(request, actor, data, software) + + if not request.app.database.del_inbox(actor.domain, data.id): return - elif data['object']['type'] != 'Follow': - return + request.app.database.save() - database = app['database'] - inbox = database.get_inbox(actor['id']) + message = misc.Message.new_unfollow( + host = request.app.config.host, + actor = actor.id, + follow = data + ) - if not inbox: - return - - database.del_inbox(inbox) - database.save() - - await misc.unfollow_remote_actor(actor['id']) + await misc.request(actor.shared_inbox, message) processors = { @@ -117,9 +104,9 @@ processors = { } -async def run_processor(request, data, actor): - if data['type'] not in processors: +async def run_processor(request, actor, data, software): + if data.type not in processors: return - logging.verbose(f'New activity from actor: {actor["id"]} {data["type"]}') - return await processors[data['type']](actor, data, request) + logging.verbose(f'New "{data.type}" from actor: {actor.id}') + return await processors[data.type](request, actor, data, software) diff --git a/relay/views.py b/relay/views.py index 6eac9d7..8d6be5e 100644 --- a/relay/views.py +++ b/relay/views.py @@ -2,25 +2,43 @@ import logging import subprocess import traceback -from aiohttp.web import HTTPForbidden, HTTPUnauthorized, Response, json_response -from urllib.parse import urlparse +from pathlib import Path -from . import __version__, app, misc +from . import __version__, misc from .http_debug import STATS +from .misc import DotDict, Message, Response, WKNodeinfo from .processors import run_processor -try: - commit_label = subprocess.check_output(["git", "rev-parse", "HEAD"]).strip().decode('ascii') - version = f'{__version__} {commit_label}' - -except: - version = __version__ +routes = [] +version = __version__ +if Path(__file__).parent.parent.joinpath('.git').exists(): + try: + commit_label = subprocess.check_output(["git", "rev-parse", "HEAD"]).strip().decode('ascii') + version = f'{__version__} {commit_label}' + + except: + pass + + +def register_route(method, path): + def wrapper(func): + routes.append([method, path, func]) + return func + + return wrapper + + +@register_route('GET', '/') async def home(request): - targets = '
'.join(app['database'].hostnames) - text = """ + targets = '
'.join(request.app.database.hostnames) + note = request.app.config.note + count = len(request.app.database.hostnames) + host = request.app.config.host + + text = f""" ActivityPub Relay at {host}