diff --git a/docs/commands.md b/docs/commands.md index 6e7da54..aea0a82 100644 --- a/docs/commands.md +++ b/docs/commands.md @@ -24,6 +24,27 @@ Run the setup wizard to configure your relay. activityrelay setup +## Config + +Manage the relay config + + activityrelay config + + +### List + +List the current config key/value pairs + + activityrelay config list + + +### Set + +Set a value for a config option + + activityrelay config set + + ## Inbox Manage the list of subscribed instances. @@ -97,6 +118,13 @@ Remove a domain from the whitelist. activityrelay whitelist remove +### Import + +Add all current inboxes to the whitelist + + activityrelay whitelist import + + ## Instance Manage the instance ban list. diff --git a/docs/configuration.md b/docs/configuration.md index ecadac3..d52ca9e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1,6 +1,8 @@ # Configuration -## DB +## General + +### DB The path to the database. It contains the relay actor private key and all subscribed instances. If the path is not absolute, it is relative to the working directory. @@ -8,7 +10,7 @@ instances. If the path is not absolute, it is relative to the working directory. db: relay.jsonld -## Listener +### Listener The address and port the relay will listen on. If the reverse proxy (nginx, apache, caddy, etc) is running on the same host, it is recommended to change `listen` to `localhost` @@ -17,22 +19,41 @@ is running on the same host, it is recommended to change `listen` to `localhost` port: 8080 -## Note +### Note A small blurb to describe your relay instance. This will show up on the relay's home page. note: "Make a note about your instance here." -## Post Limit +### Post Limit The maximum number of messages to send out at once. For each incoming message, a message will be sent out to every subscribed instance minus the instance which sent the message. This limit is to prevent too many outgoing connections from being made, so adjust if necessary. +Note: If the `workers` option is set to anything above 0, this limit will be per worker. + push_limit: 512 +### Push Workers + +The relay can be configured to use threads to push messages out. For smaller relays, this isn't +necessary, but bigger ones (>100 instances) will want to set this to the number of available cpu +threads. + + workers: 0 + + +### JSON GET cache limit + +JSON objects (actors, nodeinfo, etc) will get cached when fetched. This will set the max number of +objects to keep in the cache. + + json_cache: 1024 + + ## AP Various ActivityPub-related settings @@ -82,29 +103,3 @@ setting this to the below list will block all other relays and prevent relay cha - aoderelay - social.seattle.wa.us-relay - unciarelay - - -## Cache - -These are object limits for various caches. Only change if you know what you're doing. - - -### Objects - -The urls of messages which have been processed by the relay. - - objects: 1024 - - -### Actors - -The ActivityPub actors of incoming messages. - - actors: 1024 - - -### Actors - -The base64 encoded hashes of messages. - - digests: 1024 diff --git a/docs/installation.md b/docs/installation.md index a852ab1..8363faa 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -15,7 +15,7 @@ the [official pipx docs](https://pypa.github.io/pipx/installation/) for more in- Now simply install ActivityRelay directly from git - pipx install git+https://git.pleroma.social/pleroma/relay@0.2.3 + pipx install git+https://git.pleroma.social/pleroma/relay@0.2.4 Or from a cloned git repo. @@ -39,7 +39,7 @@ be installed via [pyenv](https://github.com/pyenv/pyenv). The instructions for installation via pip are very similar to pipx. Installation can be done from git - python3 -m pip install git+https://git.pleroma.social/pleroma/relay@0.2.3 + python3 -m pip install git+https://git.pleroma.social/pleroma/relay@0.2.4 or a cloned git repo. diff --git a/relay.yaml.example b/relay.yaml.example index d123e08..4e35697 100644 --- a/relay.yaml.example +++ b/relay.yaml.example @@ -9,13 +9,19 @@ port: 8080 # Note note: "Make a note about your instance here." -# maximum number of inbox posts to do at once +# Number of worker threads to start. If 0, use asyncio futures instead of threads. +workers: 0 + +# Maximum number of inbox posts to do at once +# If workers is set to 1 or above, this is the max for each worker push_limit: 512 -# this section is for ActivityPub +# The amount of json objects to cache from GET requests +json_cache: 1024 + ap: - # this is used for generating activitypub messages, as well as instructions for - # linking AP identities. it should be an SSL-enabled domain reachable by https. + # This is used for generating activitypub messages, as well as instructions for + # linking AP identities. It should be an SSL-enabled domain reachable by https. host: 'relay.example.com' blocked_instances: @@ -35,9 +41,3 @@ ap: #- 'aoderelay' #- 'social.seattle.wa.us-relay' #- 'unciarelay' - -# cache limits as number of items. only change this if you know what you're doing -cache: - objects: 1024 - json: 1024 - digests: 1024 diff --git a/relay/__init__.py b/relay/__init__.py index 82aaa8a..426b03e 100644 --- a/relay/__init__.py +++ b/relay/__init__.py @@ -1,3 +1,3 @@ -__version__ = '0.2.3' +__version__ = '0.2.4' from . import logger diff --git a/relay/application.py b/relay/application.py index cc2815b..dbe464f 100644 --- a/relay/application.py +++ b/relay/application.py @@ -1,14 +1,17 @@ import asyncio import logging import os +import queue import signal +import threading +import traceback from aiohttp import web -from cachetools import LRUCache from datetime import datetime, timedelta from .config import RelayConfig from .database import RelayDatabase +from .http_client import HttpClient from .misc import DotDict, check_open_port, set_app from .views import routes @@ -19,25 +22,39 @@ class Application(web.Application): self['starttime'] = None self['running'] = False - self['is_docker'] = bool(os.environ.get('DOCKER_RUNNING')) - self['config'] = RelayConfig(cfgpath, self['is_docker']) + self['config'] = RelayConfig(cfgpath) if not self['config'].load(): self['config'].save() + if self.config.is_docker: + self.config.update({ + 'db': '/data/relay.jsonld', + 'listen': '0.0.0.0', + 'port': 8080 + }) + + self['workers'] = [] + self['last_worker'] = 0 + + set_app(self) + self['database'] = RelayDatabase(self['config']) self['database'].load() - self['cache'] = DotDict({key: Cache(maxsize=self['config'][key]) for key in self['config'].cachekeys}) - self['semaphore'] = asyncio.Semaphore(self['config'].push_limit) + self['client'] = HttpClient( + database = self.database, + limit = self.config.push_limit, + timeout = self.config.timeout, + cache_size = self.config.json_cache + ) self.set_signal_handler() - set_app(self) @property - def cache(self): - return self['cache'] + def client(self): + return self['client'] @property @@ -50,16 +67,6 @@ class Application(web.Application): return self['database'] - @property - def is_docker(self): - return self['is_docker'] - - - @property - def semaphore(self): - return self['semaphore'] - - @property def uptime(self): if not self['starttime']: @@ -70,6 +77,19 @@ class Application(web.Application): return timedelta(seconds=uptime.seconds) + def push_message(self, inbox, message): + if self.config.workers <= 0: + return asyncio.ensure_future(self.client.post(inbox, message)) + + worker = self['workers'][self['last_worker']] + worker.queue.put((inbox, message)) + + self['last_worker'] += 1 + + if self['last_worker'] >= len(self['workers']): + self['last_worker'] = 0 + + def set_signal_handler(self): for sig in {'SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGTERM'}: try: @@ -85,9 +105,6 @@ class Application(web.Application): return logging.error(f'A server is already running on port {self.config.port}') for route in routes: - if route[1] == '/stats' and logging.DEBUG < logging.root.level: - continue - self.router.add_route(*route) logging.info(f'Starting webserver at {self.config.host} ({self.config.listen}:{self.config.port})') @@ -101,6 +118,13 @@ class Application(web.Application): async def handle_run(self): self['running'] = True + if self.config.workers > 0: + for i in range(self.config.workers): + worker = PushWorker(self) + worker.start() + + self['workers'].append(worker) + runner = web.AppRunner(self, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{User-Agent}i"') await runner.setup() @@ -120,8 +144,73 @@ class Application(web.Application): self['starttime'] = None self['running'] = False + self['workers'].clear() -class Cache(LRUCache): - def set_maxsize(self, value): - self.__maxsize = int(value) +class PushWorker(threading.Thread): + def __init__(self, app): + threading.Thread.__init__(self) + self.app = app + self.queue = queue.Queue() + + + def run(self): + self.client = HttpClient( + database = self.app.database, + limit = self.app.config.push_limit, + timeout = self.app.config.timeout, + cache_size = self.app.config.json_cache + ) + + asyncio.run(self.handle_queue()) + + + async def handle_queue(self): + while self.app['running']: + try: + inbox, message = self.queue.get(block=True, timeout=0.25) + self.queue.task_done() + logging.verbose(f'New push from Thread-{threading.get_ident()}') + await self.client.post(inbox, message) + + except queue.Empty: + pass + + ## make sure an exception doesn't bring down the worker + except Exception: + traceback.print_exc() + + await self.client.close() + + +## Can't sub-class web.Request, so let's just add some properties +def request_actor(self): + try: return self['actor'] + except KeyError: pass + + +def request_instance(self): + try: return self['instance'] + except KeyError: pass + + +def request_message(self): + try: return self['message'] + except KeyError: pass + + +def request_signature(self): + if 'signature' not in self._state: + try: self['signature'] = DotDict.new_from_signature(self.headers['signature']) + except KeyError: return + + return self['signature'] + + +setattr(web.Request, 'actor', property(request_actor)) +setattr(web.Request, 'instance', property(request_instance)) +setattr(web.Request, 'message', property(request_message)) +setattr(web.Request, 'signature', property(request_signature)) + +setattr(web.Request, 'config', property(lambda self: self.app.config)) +setattr(web.Request, 'database', property(lambda self: self.app.database)) diff --git a/relay/config.py b/relay/config.py index fd22f22..996fa9f 100644 --- a/relay/config.py +++ b/relay/config.py @@ -1,60 +1,51 @@ import json +import os import yaml +from functools import cached_property from pathlib import Path from urllib.parse import urlparse -from .misc import DotDict +from .misc import DotDict, boolean -relay_software_names = [ - 'activityrelay', - 'aoderelay', - 'social.seattle.wa.us-relay', - 'unciarelay' +RELAY_SOFTWARE = [ + 'activityrelay', # https://git.pleroma.social/pleroma/relay + 'aoderelay', # https://git.asonix.dog/asonix/relay + 'feditools-relay' # https://git.ptzo.gdn/feditools/relay +] + +APKEYS = [ + 'host', + 'whitelist_enabled', + 'blocked_software', + 'blocked_instances', + 'whitelist' ] class RelayConfig(DotDict): - apkeys = { - 'host', - 'whitelist_enabled', - 'blocked_software', - 'blocked_instances', - 'whitelist' - } - - cachekeys = { - 'json', - 'objects', - 'digests' - } - - - def __init__(self, path, is_docker): + def __init__(self, path): DotDict.__init__(self, {}) - if is_docker: - path = '/data/relay.yaml' + if self.is_docker: + path = '/data/config.yaml' - self._isdocker = is_docker self._path = Path(path).expanduser() - self.reset() def __setitem__(self, key, value): - if self._isdocker and key in ['db', 'listen', 'port']: - return - if key in ['blocked_instances', 'blocked_software', 'whitelist']: assert isinstance(value, (list, set, tuple)) - elif key in ['port', 'json', 'objects', 'digests']: - assert isinstance(value, (int)) + elif key in ['port', 'workers', 'json_cache', 'timeout']: + if not isinstance(value, int): + value = int(value) elif key == 'whitelist_enabled': - assert isinstance(value, bool) + if not isinstance(value, bool): + value = boolean(value) super().__setitem__(key, value) @@ -84,6 +75,11 @@ class RelayConfig(DotDict): return f'{self.actor}#main-key' + @cached_property + def is_docker(self): + return bool(os.environ.get('DOCKER_RUNNING')) + + def reset(self): self.clear() self.update({ @@ -92,16 +88,17 @@ class RelayConfig(DotDict): 'port': 8080, 'note': 'Make a note about your instance here.', 'push_limit': 512, + 'json_cache': 1024, + 'timeout': 10, + 'workers': 0, 'host': 'relay.example.com', + 'whitelist_enabled': False, 'blocked_software': [], 'blocked_instances': [], - 'whitelist': [], - 'whitelist_enabled': False, - 'json': 1024, - 'objects': 1024, - 'digests': 1024 + 'whitelist': [] }) + def ban_instance(self, instance): if instance.startswith('http'): instance = urlparse(instance).hostname @@ -208,13 +205,15 @@ class RelayConfig(DotDict): return False for key, value in config.items(): - if key in ['ap', 'cache']: + if key in ['ap']: for k, v in value.items(): if k not in self: continue self[k] = v + continue + elif key not in self: continue @@ -228,13 +227,16 @@ class RelayConfig(DotDict): def save(self): config = { - 'db': self['db'], + # just turning config.db into a string is good enough for now + 'db': str(self.db), 'listen': self.listen, 'port': self.port, 'note': self.note, 'push_limit': self.push_limit, - 'ap': {key: self[key] for key in self.apkeys}, - 'cache': {key: self[key] for key in self.cachekeys} + 'workers': self.workers, + 'json_cache': self.json_cache, + 'timeout': self.timeout, + 'ap': {key: self[key] for key in APKEYS} } with open(self._path, 'w') as fd: diff --git a/relay/database.py b/relay/database.py index 90b1473..ad093cd 100644 --- a/relay/database.py +++ b/relay/database.py @@ -1,8 +1,9 @@ +import aputils +import asyncio import json import logging import traceback -from Crypto.PublicKey import RSA from urllib.parse import urlparse @@ -16,22 +17,7 @@ class RelayDatabase(dict): }) self.config = config - self.PRIVKEY = None - - - @property - def PUBKEY(self): - return self.PRIVKEY.publickey() - - - @property - def pubkey(self): - return self.PUBKEY.exportKey('PEM').decode('utf-8') - - - @property - def privkey(self): - return self['private-key'] + self.signer = None @property @@ -44,11 +30,6 @@ class RelayDatabase(dict): return tuple(data['inbox'] for data in self['relay-list'].values()) - def generate_key(self): - self.PRIVKEY = RSA.generate(4096) - self['private-key'] = self.PRIVKEY.exportKey('PEM').decode('utf-8') - - def load(self): new_db = True @@ -68,6 +49,7 @@ class RelayDatabase(dict): for item in data.get('relay-list', []): domain = urlparse(item).hostname self['relay-list'][domain] = { + 'domain': domain, 'inbox': item, 'followid': None } @@ -75,9 +57,13 @@ class RelayDatabase(dict): else: self['relay-list'] = data.get('relay-list', {}) - for domain in self['relay-list'].keys(): + for domain, instance in self['relay-list'].items(): if self.config.is_banned(domain) or (self.config.whitelist_enabled and not self.config.is_whitelisted(domain)): self.del_inbox(domain) + continue + + if not instance.get('domain'): + instance['domain'] = domain new_db = False @@ -88,12 +74,13 @@ class RelayDatabase(dict): if self.config.db.stat().st_size > 0: raise e from None - if not self.privkey: + if not self['private-key']: logging.info("No actor keys present, generating 4096-bit RSA keypair.") - self.generate_key() + self.signer = aputils.Signer.new(self.config.keyid, size=4096) + self['private-key'] = self.signer.export() else: - self.PRIVKEY = RSA.importKey(self.privkey) + self.signer = aputils.Signer(self['private-key'], self.config.keyid) self.save() return not new_db @@ -108,29 +95,34 @@ class RelayDatabase(dict): if domain.startswith('http'): domain = urlparse(domain).hostname - if domain not in self['relay-list']: - if fail: - raise KeyError(domain) + inbox = self['relay-list'].get(domain) - return + if inbox: + return inbox - return self['relay-list'][domain] + if fail: + raise KeyError(domain) - def add_inbox(self, inbox, followid=None, fail=False): + def add_inbox(self, inbox, followid=None, software=None): assert inbox.startswith('https'), 'Inbox must be a url' domain = urlparse(inbox).hostname + instance = self.get_inbox(domain) - if self.get_inbox(domain): - if fail: - raise KeyError(domain) + if instance: + if followid: + instance['followid'] = followid - return False + if software: + instance['software'] = software + + return instance self['relay-list'][domain] = { 'domain': domain, 'inbox': inbox, - 'followid': followid + 'followid': followid, + 'software': software } logging.verbose(f'Added inbox to database: {inbox}') @@ -158,11 +150,6 @@ class RelayDatabase(dict): return False - def set_followid(self, domain, followid): - data = self.get_inbox(domain, fail=True) - data['followid'] = followid - - def get_request(self, domain, fail=True): if domain.startswith('http'): domain = urlparse(domain).hostname @@ -197,3 +184,14 @@ class RelayDatabase(dict): domain = urlparse(inbox).hostname del self['follow-requests'][domain] + + + def distill_inboxes(self, message): + src_domains = { + message.domain, + urlparse(message.objectid).netloc + } + + for domain, instance in self['relay-list'].items(): + if domain not in src_domains: + yield instance['inbox'] diff --git a/relay/http_client.py b/relay/http_client.py new file mode 100644 index 0000000..8802471 --- /dev/null +++ b/relay/http_client.py @@ -0,0 +1,192 @@ +import logging +import traceback + +from aiohttp import ClientSession, ClientTimeout, TCPConnector +from aiohttp.client_exceptions import ClientConnectorError, ServerTimeoutError +from aputils import Nodeinfo, WellKnownNodeinfo +from datetime import datetime +from cachetools import LRUCache +from json.decoder import JSONDecodeError +from urllib.parse import urlparse + +from . import __version__ +from .misc import ( + MIMETYPES, + DotDict, + Message +) + + +HEADERS = { + 'Accept': f'{MIMETYPES["activity"]}, {MIMETYPES["json"]};q=0.9', + 'User-Agent': f'ActivityRelay/{__version__}' +} + + +class Cache(LRUCache): + def set_maxsize(self, value): + self.__maxsize = int(value) + + +class HttpClient: + def __init__(self, database, limit=100, timeout=10, cache_size=1024): + self.database = database + self.cache = Cache(cache_size) + self.cfg = {'limit': limit, 'timeout': timeout} + self._conn = None + self._session = None + + + @property + def limit(self): + return self.cfg['limit'] + + + @property + def timeout(self): + return self.cfg['timeout'] + + + async def open(self): + if self._session: + return + + self._conn = TCPConnector( + limit = self.limit, + ttl_dns_cache = 300, + ) + + self._session = ClientSession( + connector = self._conn, + headers = HEADERS, + connector_owner = True, + timeout = ClientTimeout(total=self.timeout) + ) + + + async def close(self): + if not self._session: + return + + await self._session.close() + await self._conn.close() + + self._conn = None + self._session = None + + + async def get(self, url, sign_headers=False, loads=None, force=False): + await self.open() + + try: url, _ = url.split('#', 1) + except: pass + + if not force and url in self.cache: + return self.cache[url] + + headers = {} + + if sign_headers: + headers.update(self.database.signer.sign_headers('GET', url, algorithm='original')) + + try: + logging.verbose(f'Fetching resource: {url}') + + async with self._session.get(url, headers=headers) as resp: + ## Not expecting a response with 202s, so just return + if resp.status == 202: + return + + elif resp.status != 200: + logging.verbose(f'Received error when requesting {url}: {resp.status}') + logging.verbose(await resp.read()) # change this to debug + return + + if loads: + message = await resp.json(loads=loads) + + elif resp.content_type == MIMETYPES['activity']: + message = await resp.json(loads=Message.new_from_json) + + elif resp.content_type == MIMETYPES['json']: + message = await resp.json(loads=DotDict.new_from_json) + + else: + # todo: raise TypeError or something + logging.verbose(f'Invalid Content-Type for "{url}": {resp.content_type}') + return logging.debug(f'Response: {resp.read()}') + + logging.debug(f'{url} >> resp {message.to_json(4)}') + + self.cache[url] = message + return message + + except JSONDecodeError: + logging.verbose(f'Failed to parse JSON') + + except (ClientConnectorError, ServerTimeoutError): + logging.verbose(f'Failed to connect to {urlparse(url).netloc}') + + except Exception as e: + traceback.print_exc() + raise e + + + async def post(self, url, message): + await self.open() + + instance = self.database.get_inbox(url) + + ## Using the old algo by default is probably a better idea right now + if instance and instance.get('software') in {'mastodon'}: + algorithm = 'hs2019' + + else: + algorithm = 'original' + + headers = {'Content-Type': 'application/activity+json'} + headers.update(self.database.signer.sign_headers('POST', url, message, algorithm=algorithm)) + + try: + logging.verbose(f'Sending "{message.type}" to {url}') + + async with self._session.post(url, headers=headers, data=message.to_json()) as resp: + ## Not expecting a response, so just return + if resp.status in {200, 202}: + return logging.verbose(f'Successfully sent "{message.type}" to {url}') + + logging.verbose(f'Received error when pushing to {url}: {resp.status}') + return logging.verbose(await resp.read()) # change this to debug + + except (ClientConnectorError, ServerTimeoutError): + logging.verbose(f'Failed to connect to {url}') + + ## prevent workers from being brought down + except Exception as e: + traceback.print_exc() + + + ## Additional methods ## + async def fetch_nodeinfo(self, domain): + nodeinfo_url = None + wk_nodeinfo = await self.get( + f'https://{domain}/.well-known/nodeinfo', + loads = WellKnownNodeinfo.new_from_json + ) + + if not wk_nodeinfo: + logging.verbose(f'Failed to fetch well-known nodeinfo url for domain: {domain}') + return False + + for version in ['20', '21']: + try: + nodeinfo_url = wk_nodeinfo.get_url(version) + + except KeyError: + pass + + if not nodeinfo_url: + logging.verbose(f'Failed to fetch nodeinfo url for domain: {domain}') + return False + + return await self.get(nodeinfo_url, loads=Nodeinfo.new_from_json) or False diff --git a/relay/http_debug.py b/relay/http_debug.py deleted file mode 100644 index 2a2ae67..0000000 --- a/relay/http_debug.py +++ /dev/null @@ -1,68 +0,0 @@ -import logging -import aiohttp - -from collections import defaultdict - - -STATS = { - 'requests': defaultdict(int), - 'response_codes': defaultdict(int), - 'response_codes_per_domain': defaultdict(lambda: defaultdict(int)), - 'delivery_codes': defaultdict(int), - 'delivery_codes_per_domain': defaultdict(lambda: defaultdict(int)), - 'exceptions': defaultdict(int), - 'exceptions_per_domain': defaultdict(lambda: defaultdict(int)), - 'delivery_exceptions': defaultdict(int), - 'delivery_exceptions_per_domain': defaultdict(lambda: defaultdict(int)) -} - - -async def on_request_start(session, trace_config_ctx, params): - global STATS - - logging.debug("HTTP START [%r], [%r]", session, params) - - STATS['requests'][params.url.host] += 1 - - -async def on_request_end(session, trace_config_ctx, params): - global STATS - - logging.debug("HTTP END [%r], [%r]", session, params) - - host = params.url.host - status = params.response.status - - STATS['response_codes'][status] += 1 - STATS['response_codes_per_domain'][host][status] += 1 - - if params.method == 'POST': - STATS['delivery_codes'][status] += 1 - STATS['delivery_codes_per_domain'][host][status] += 1 - - -async def on_request_exception(session, trace_config_ctx, params): - global STATS - - logging.debug("HTTP EXCEPTION [%r], [%r]", session, params) - - host = params.url.host - exception = repr(params.exception) - - STATS['exceptions'][exception] += 1 - STATS['exceptions_per_domain'][host][exception] += 1 - - if params.method == 'POST': - STATS['delivery_exceptions'][exception] += 1 - STATS['delivery_exceptions_per_domain'][host][exception] += 1 - - -def http_debug(): - if logging.DEBUG >= logging.root.level: - return - - trace_config = aiohttp.TraceConfig() - trace_config.on_request_start.append(on_request_start) - trace_config.on_request_end.append(on_request_end) - trace_config.on_request_exception.append(on_request_exception) - return [trace_config] diff --git a/relay/manage.py b/relay/manage.py index 0a543f5..0d7decc 100644 --- a/relay/manage.py +++ b/relay/manage.py @@ -8,10 +8,11 @@ from urllib.parse import urlparse from . import misc, __version__ from .application import Application -from .config import relay_software_names +from .config import RELAY_SOFTWARE app = None +CONFIG_IGNORE = {'blocked_software', 'blocked_instances', 'whitelist'} @click.group('cli', context_settings={'show_default': True}, invoke_without_command=True) @@ -24,15 +25,95 @@ def cli(ctx, config): if not ctx.invoked_subcommand: if app.config.host.endswith('example.com'): - relay_setup.callback() + cli_setup.callback() else: - relay_run.callback() + cli_run.callback() + + +@cli.command('setup') +def cli_setup(): + 'Generate a new config' + + while True: + app.config.host = click.prompt('What domain will the relay be hosted on?', default=app.config.host) + + if not app.config.host.endswith('example.com'): + break + + click.echo('The domain must not be example.com') + + if not app.config.is_docker: + app.config.listen = click.prompt('Which address should the relay listen on?', default=app.config.listen) + + while True: + app.config.port = click.prompt('What TCP port should the relay listen on?', default=app.config.port, type=int) + break + + app.config.save() + + if not app.config.is_docker and click.confirm('Relay all setup! Would you like to run it now?'): + cli_run.callback() + + +@cli.command('run') +def cli_run(): + 'Run the relay' + + if app.config.host.endswith('example.com'): + return click.echo('Relay is not set up. Please edit your relay config or run "activityrelay setup".') + + vers_split = platform.python_version().split('.') + pip_command = 'pip3 uninstall pycrypto && pip3 install pycryptodome' + + if Crypto.__version__ == '2.6.1': + if int(vers_split[1]) > 7: + click.echo('Error: PyCrypto is broken on Python 3.8+. Please replace it with pycryptodome before running again. Exiting...') + return click.echo(pip_command) + + else: + click.echo('Warning: PyCrypto is old and should be replaced with pycryptodome') + return click.echo(pip_command) + + if not misc.check_open_port(app.config.listen, app.config.port): + return click.echo(f'Error: A server is already running on port {app.config.port}') + + app.run() + + +# todo: add config default command for resetting config key +@cli.group('config') +def cli_config(): + 'Manage the relay config' + pass + + +@cli_config.command('list') +def cli_config_list(): + 'List the current relay config' + + click.echo('Relay Config:') + + for key, value in app.config.items(): + if key not in CONFIG_IGNORE: + key = f'{key}:'.ljust(20) + click.echo(f'- {key} {value}') + + +@cli_config.command('set') +@click.argument('key') +@click.argument('value') +def cli_config_set(key, value): + 'Set a config value' + + app.config[key] = value + app.config.save() + + print(f'{key}: {app.config[key]}') @cli.group('inbox') -@click.pass_context -def cli_inbox(ctx): +def cli_inbox(): 'Manage the inboxes in the database' pass @@ -67,15 +148,19 @@ def cli_inbox_follow(actor): inbox = inbox_data['inbox'] except KeyError: - actor_data = asyncio.run(misc.request(actor)) + actor_data = asyncio.run(app.client.get(actor, sign_headers=True)) + + if not actor_data: + return click.echo(f'Failed to fetch actor: {actor}') + inbox = actor_data.shared_inbox message = misc.Message.new_follow( host = app.config.host, - actor = actor.id + actor = actor ) - asyncio.run(misc.request(inbox, message)) + asyncio.run(app.client.post(inbox, message)) click.echo(f'Sent follow message to actor: {actor}') @@ -101,7 +186,7 @@ def cli_inbox_unfollow(actor): ) except KeyError: - actor_data = asyncio.run(misc.request(actor)) + actor_data = asyncio.run(app.client.get(actor, sign_headers=True)) inbox = actor_data.shared_inbox message = misc.Message.new_unfollow( host = app.config.host, @@ -113,7 +198,7 @@ def cli_inbox_unfollow(actor): } ) - asyncio.run(misc.request(inbox, message)) + asyncio.run(app.client.post(inbox, message)) click.echo(f'Sent unfollow message to: {actor}') @@ -128,11 +213,13 @@ def cli_inbox_add(inbox): if app.config.is_banned(inbox): return click.echo(f'Error: Refusing to add banned inbox: {inbox}') - if app.database.add_inbox(inbox): - app.database.save() - return click.echo(f'Added inbox to the database: {inbox}') + if app.database.get_inbox(inbox): + return click.echo(f'Error: Inbox already in database: {inbox}') - click.echo(f'Error: Inbox already in database: {inbox}') + app.database.add_inbox(inbox) + app.database.save() + + click.echo(f'Added inbox to the database: {inbox}') @cli_inbox.command('remove') @@ -228,21 +315,21 @@ def cli_software_ban(name, fetch_nodeinfo): 'Ban software. Use RELAYS for NAME to ban relays' if name == 'RELAYS': - for name in relay_software_names: + for name in RELAY_SOFTWARE: app.config.ban_software(name) app.config.save() return click.echo('Banned all relay software') if fetch_nodeinfo: - software = asyncio.run(misc.fetch_nodeinfo(name)) + nodeinfo = asyncio.run(app.client.fetch_nodeinfo(name)) - if not software: + if not nodeinfo: click.echo(f'Failed to fetch software name from domain: {name}') - name = software + name = nodeinfo.sw_name - if config.ban_software(name): + if app.config.ban_software(name): app.config.save() return click.echo(f'Banned software: {name}') @@ -258,19 +345,19 @@ def cli_software_unban(name, fetch_nodeinfo): 'Ban software. Use RELAYS for NAME to unban relays' if name == 'RELAYS': - for name in relay_software_names: + for name in RELAY_SOFTWARE: app.config.unban_software(name) - config.save() + app.config.save() return click.echo('Unbanned all relay software') if fetch_nodeinfo: - software = asyncio.run(misc.fetch_nodeinfo(name)) + nodeinfo = asyncio.run(app.client.fetch_nodeinfo(name)) - if not software: + if not nodeinfo: click.echo(f'Failed to fetch software name from domain: {name}') - name = software + name = nodeinfo.sw_name if app.config.unban_software(name): app.config.save() @@ -279,7 +366,6 @@ def cli_software_unban(name, fetch_nodeinfo): click.echo(f'Software wasn\'t banned: {name}') - @cli.group('whitelist') def cli_whitelist(): 'Manage the instance whitelist' @@ -288,6 +374,8 @@ def cli_whitelist(): @cli_whitelist.command('list') def cli_whitelist_list(): + 'List all the instances in the whitelist' + click.echo('Current whitelisted domains') for domain in app.config.whitelist: @@ -317,59 +405,18 @@ def cli_whitelist_remove(instance): app.config.save() if app.config.whitelist_enabled: - if app.database.del_inbox(inbox): + if app.database.del_inbox(instance): app.database.save() click.echo(f'Removed instance from the whitelist: {instance}') -@cli.command('setup') -def relay_setup(): - 'Generate a new config' +@cli_whitelist.command('import') +def cli_whitelist_import(): + 'Add all current inboxes to the whitelist' - while True: - app.config.host = click.prompt('What domain will the relay be hosted on?', default=app.config.host) - - if not app.config.host.endswith('example.com'): - break - - click.echo('The domain must not be example.com') - - app.config.listen = click.prompt('Which address should the relay listen on?', default=app.config.listen) - - while True: - app.config.port = click.prompt('What TCP port should the relay listen on?', default=app.config.port, type=int) - break - - app.config.save() - - if not app['is_docker'] and click.confirm('Relay all setup! Would you like to run it now?'): - relay_run.callback() - - -@cli.command('run') -def relay_run(): - 'Run the relay' - - if app.config.host.endswith('example.com'): - return click.echo('Relay is not set up. Please edit your relay config or run "activityrelay setup".') - - vers_split = platform.python_version().split('.') - pip_command = 'pip3 uninstall pycrypto && pip3 install pycryptodome' - - if Crypto.__version__ == '2.6.1': - if int(vers_split[1]) > 7: - click.echo('Error: PyCrypto is broken on Python 3.8+. Please replace it with pycryptodome before running again. Exiting...') - return click.echo(pip_command) - - else: - click.echo('Warning: PyCrypto is old and should be replaced with pycryptodome') - return click.echo(pip_command) - - if not misc.check_open_port(app.config.listen, app.config.port): - return click.echo(f'Error: A server is already running on port {app.config.port}') - - app.run() + for domain in app.database.hostnames: + cli_whitelist_add.callback(domain) def main(): diff --git a/relay/misc.py b/relay/misc.py index e5f362e..a98088f 100644 --- a/relay/misc.py +++ b/relay/misc.py @@ -1,3 +1,4 @@ +import aputils import asyncio import base64 import json @@ -6,10 +7,6 @@ import socket import traceback import uuid -from Crypto.Hash import SHA, SHA256, SHA512 -from Crypto.PublicKey import RSA -from Crypto.Signature import PKCS1_v1_5 -from aiohttp import ClientSession from aiohttp.hdrs import METH_ALL as METHODS from aiohttp.web import Response as AiohttpResponse, View as AiohttpView from datetime import datetime @@ -17,17 +14,9 @@ from json.decoder import JSONDecodeError from urllib.parse import urlparse from uuid import uuid4 -from .http_debug import http_debug - app = None -HASHES = { - 'sha1': SHA, - 'sha256': SHA256, - 'sha512': SHA512 -} - MIMETYPES = { 'activity': 'application/activity+json', 'html': 'text/html', @@ -46,8 +35,35 @@ def set_app(new_app): app = new_app -def build_signing_string(headers, used_headers): - return '\n'.join(map(lambda x: ': '.join([x.lower(), headers[x]]), used_headers)) +def boolean(value): + if isinstance(value, str): + if value.lower() in ['on', 'y', 'yes', 'true', 'enable', 'enabled', '1']: + return True + + elif value.lower() in ['off', 'n', 'no', 'false', 'disable', 'disable', '0']: + return False + + else: + raise TypeError(f'Cannot parse string "{value}" as a boolean') + + elif isinstance(value, int): + if value == 1: + return True + + elif value == 0: + return False + + else: + raise ValueError('Integer value must be 1 or 0') + + elif value == None: + return False + + try: + return value.__bool__() + + except AttributeError: + raise TypeError(f'Cannot convert object of type "{clsname(value)}"') def check_open_port(host, port): @@ -62,203 +78,6 @@ def check_open_port(host, port): return False -def create_signature_header(headers): - headers = {k.lower(): v for k, v in headers.items()} - used_headers = headers.keys() - sigstring = build_signing_string(headers, used_headers) - - sig = { - 'keyId': app.config.keyid, - 'algorithm': 'rsa-sha256', - 'headers': ' '.join(used_headers), - 'signature': sign_signing_string(sigstring, app.database.PRIVKEY) - } - - chunks = ['{}="{}"'.format(k, v) for k, v in sig.items()] - return ','.join(chunks) - - -def distill_inboxes(actor, object_id): - for inbox in app.database.inboxes: - if inbox != actor.shared_inbox and urlparse(inbox).hostname != urlparse(object_id).hostname: - yield inbox - - -def generate_body_digest(body): - bodyhash = app.cache.digests.get(body) - - if bodyhash: - return bodyhash - - h = SHA256.new(body.encode('utf-8')) - bodyhash = base64.b64encode(h.digest()).decode('utf-8') - app.cache.digests[body] = bodyhash - - return bodyhash - - -def sign_signing_string(sigstring, key): - pkcs = PKCS1_v1_5.new(key) - h = SHA256.new() - h.update(sigstring.encode('ascii')) - sigdata = pkcs.sign(h) - - return base64.b64encode(sigdata).decode('utf-8') - - -async def fetch_actor_key(actor): - actor_data = await request(actor) - - if not actor_data: - return None - - try: - return RSA.importKey(actor_data['publicKey']['publicKeyPem']) - - except Exception as e: - logging.debug(f'Exception occured while fetching actor key: {e}') - - -async def fetch_nodeinfo(domain): - nodeinfo_url = None - wk_nodeinfo = await request(f'https://{domain}/.well-known/nodeinfo', sign_headers=False, activity=False) - - if not wk_nodeinfo: - return - - wk_nodeinfo = WKNodeinfo(wk_nodeinfo) - - for version in ['20', '21']: - try: - nodeinfo_url = wk_nodeinfo.get_url(version) - - except KeyError: - pass - - if not nodeinfo_url: - logging.verbose(f'Failed to fetch nodeinfo url for domain: {domain}') - return False - - nodeinfo = await request(nodeinfo_url, sign_headers=False, activity=False) - - try: - return nodeinfo['software']['name'] - - except KeyError: - return False - - -async def request(uri, data=None, force=False, sign_headers=True, activity=True): - ## If a get request and not force, try to use the cache first - if not data and not force: - try: - return app.cache.json[uri] - - except KeyError: - pass - - url = urlparse(uri) - method = 'POST' if data else 'GET' - action = data.get('type') if data else None - headers = { - 'Accept': f'{MIMETYPES["activity"]}, {MIMETYPES["json"]};q=0.9', - 'User-Agent': 'ActivityRelay', - } - - if data: - headers['Content-Type'] = MIMETYPES['activity' if activity else 'json'] - - if sign_headers: - signing_headers = { - '(request-target)': f'{method.lower()} {url.path}', - 'Date': datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT'), - 'Host': url.netloc - } - - if data: - assert isinstance(data, dict) - - data = json.dumps(data) - signing_headers.update({ - 'Digest': f'SHA-256={generate_body_digest(data)}', - 'Content-Length': str(len(data.encode('utf-8'))) - }) - - signing_headers['Signature'] = create_signature_header(signing_headers) - - del signing_headers['(request-target)'] - del signing_headers['Host'] - - headers.update(signing_headers) - - try: - if data: - logging.verbose(f'Sending "{action}" to inbox: {uri}') - - else: - logging.verbose(f'Sending GET request to url: {uri}') - - async with ClientSession(trace_configs=http_debug()) as session, app.semaphore: - async with session.request(method, uri, headers=headers, data=data) as resp: - ## aiohttp has been known to leak if the response hasn't been read, - ## so we're just gonna read the request no matter what - resp_data = await resp.read() - - ## Not expecting a response, so just return - if resp.status == 202: - return - - elif resp.status != 200: - if not resp_data: - return logging.verbose(f'Received error when requesting {uri}: {resp.status} {resp_data}') - - return logging.verbose(f'Received error when sending {action} to {uri}: {resp.status} {resp_data}') - - if resp.content_type == MIMETYPES['activity']: - resp_data = await resp.json(loads=Message.new_from_json) - - elif resp.content_type == MIMETYPES['json']: - resp_data = await resp.json(loads=DotDict.new_from_json) - - else: - logging.verbose(f'Invalid Content-Type for "{url}": {resp.content_type}') - return logging.debug(f'Response: {resp_data}') - - logging.debug(f'{uri} >> resp {resp_data}') - - app.cache.json[uri] = resp_data - return resp_data - - except JSONDecodeError: - return - - except Exception: - traceback.print_exc() - - -async def validate_signature(actor, signature, http_request): - headers = {key.lower(): value for key, value in http_request.headers.items()} - headers['(request-target)'] = ' '.join([http_request.method.lower(), http_request.path]) - - sigstring = build_signing_string(headers, signature['headers']) - logging.debug(f'sigstring: {sigstring}') - - sign_alg, _, hash_alg = signature['algorithm'].partition('-') - logging.debug(f'sign alg: {sign_alg}, hash alg: {hash_alg}') - - sigdata = base64.b64decode(signature['signature']) - - pkcs = PKCS1_v1_5.new(actor.PUBKEY) - h = HASHES[hash_alg].new() - h.update(sigstring.encode('ascii')) - result = pkcs.verify(h, sigdata) - - http_request['validated'] = result - - logging.debug(f'validates? {result}') - return result - - class DotDict(dict): def __init__(self, _data, **kwargs): dict.__init__(self) @@ -427,16 +246,6 @@ class Message(DotDict): # actor properties - @property - def PUBKEY(self): - return RSA.import_key(self.pubkey) - - - @property - def pubkey(self): - return self.publicKey.publicKeyPem - - @property def shared_inbox(self): return self.get('endpoints', {}).get('sharedInbox', self.inbox) @@ -459,6 +268,11 @@ class Message(DotDict): return self.object + @property + def signer(self): + return aputils.Signer.new_from_actor(self) + + class Response(AiohttpResponse): @classmethod def new(cls, body='', status=200, headers=None, ctype='text'): @@ -516,11 +330,6 @@ class View(AiohttpView): return self._request.app - @property - def cache(self): - return self.app.cache - - @property def config(self): return self.app.config @@ -529,22 +338,3 @@ class View(AiohttpView): @property def database(self): return self.app.database - - -class WKNodeinfo(DotDict): - @classmethod - def new(cls, v20, v21): - return cls({ - 'links': [ - {'rel': NODEINFO_NS['20'], 'href': v20}, - {'rel': NODEINFO_NS['21'], 'href': v21} - ] - }) - - - def get_url(self, version='20'): - for item in self.links: - if item['rel'] == NODEINFO_NS[version]: - return item['href'] - - raise KeyError(version) diff --git a/relay/processors.py b/relay/processors.py index 69f8b59..2d34246 100644 --- a/relay/processors.py +++ b/relay/processors.py @@ -1,64 +1,104 @@ import asyncio import logging +from cachetools import LRUCache from uuid import uuid4 -from . import misc +from .misc import Message -async def handle_relay(request, actor, data, software): - if data.objectid in request.app.cache.objects: - logging.verbose(f'already relayed {data.objectid}') +cache = LRUCache(1024) + + +def person_check(actor, software): + ## pleroma and akkoma use Person for the actor type for some reason + if software in {'akkoma', 'pleroma'} and actor.id != f'https://{actor.domain}/relay': + return True + + ## make sure the actor is an application + elif actor.type != 'Application': + return True + + +async def handle_relay(request): + if request.message.objectid in cache: + logging.verbose(f'already relayed {request.message.objectid}') return - logging.verbose(f'Relaying post from {data.actorid}') - - message = misc.Message.new_announce( - host = request.app.config.host, - object = data.objectid + message = Message.new_announce( + host = request.config.host, + object = request.message.objectid ) + cache[request.message.objectid] = message.id logging.debug(f'>> relay: {message}') - inboxes = misc.distill_inboxes(actor, data.objectid) - futures = [misc.request(inbox, data=message) for inbox in inboxes] + inboxes = request.database.distill_inboxes(request.message) - asyncio.ensure_future(asyncio.gather(*futures)) - request.app.cache.objects[data.objectid] = message.id + for inbox in inboxes: + request.app.push_message(inbox, message) -async def handle_forward(request, actor, data, software): - if data.id in request.app.cache.objects: - logging.verbose(f'already forwarded {data.id}') +async def handle_forward(request): + if request.message.id in cache: + logging.verbose(f'already forwarded {request.message.id}') return - message = misc.Message.new_announce( - host = request.app.config.host, - object = data + message = Message.new_announce( + host = request.config.host, + object = request.message ) - logging.verbose(f'Forwarding post from {actor.id}') - logging.debug(f'>> Relay {data}') + cache[request.message.id] = message.id + logging.debug(f'>> forward: {message}') - inboxes = misc.distill_inboxes(actor, data.id) - futures = [misc.request(inbox, data=message) for inbox in inboxes] + inboxes = request.database.distill_inboxes(request.message) - asyncio.ensure_future(asyncio.gather(*futures)) - request.app.cache.objects[data.id] = message.id + for inbox in inboxes: + request.app.push_message(inbox, message) -async def handle_follow(request, actor, data, software): - if not request.app.database.add_inbox(actor.shared_inbox, data.id): - request.app.database.set_followid(actor.id, data.id) +async def handle_follow(request): + nodeinfo = await request.app.client.fetch_nodeinfo(request.actor.domain) + software = nodeinfo.sw_name if nodeinfo else None - request.app.database.save() + ## reject if software used by actor is banned + if request.config.is_banned_software(software): + request.app.push_message( + request.actor.shared_inbox, + Message.new_response( + host = request.config.host, + actor = request.actor.id, + followid = request.message.id, + accept = False + ) + ) - await misc.request( - actor.shared_inbox, - misc.Message.new_response( - host = request.app.config.host, - actor = actor.id, - followid = data.id, + return logging.verbose(f'Rejected follow from actor for using specific software: actor={request.actor.id}, software={software}') + + ## reject if the actor is not an instance actor + if person_check(request.actor, software): + request.app.push_message( + request.actor.shared_inbox, + Message.new_response( + host = request.config.host, + actor = request.actor.id, + followid = request.message.id, + accept = False + ) + ) + + return logging.verbose(f'Non-application actor tried to follow: {request.actor.id}') + + request.database.add_inbox(request.actor.shared_inbox, request.message.id, software) + request.database.save() + + request.app.push_message( + request.actor.shared_inbox, + Message.new_response( + host = request.config.host, + actor = request.actor.id, + followid = request.message.id, accept = True ) ) @@ -66,33 +106,34 @@ async def handle_follow(request, actor, data, software): # Are Akkoma and Pleroma the only two that expect a follow back? # Ignoring only Mastodon for now if software != 'mastodon': - await misc.request( - actor.shared_inbox, - misc.Message.new_follow( - host = request.app.config.host, - actor = actor.id + request.app.push_message( + request.actor.shared_inbox, + Message.new_follow( + host = request.config.host, + actor = request.actor.id ) ) -async def handle_undo(request, actor, data, software): +async def handle_undo(request): ## If the object is not a Follow, forward it - if data['object']['type'] != 'Follow': - return await handle_forward(request, actor, data, software) + if request.message.object.type != 'Follow': + return await handle_forward(request) - if not request.app.database.del_inbox(actor.domain, data.id): + if not request.database.del_inbox(request.actor.domain, request.message.id): return - request.app.database.save() + request.database.save() - message = misc.Message.new_unfollow( - host = request.app.config.host, - actor = actor.id, - follow = data + request.app.push_message( + request.actor.shared_inbox, + Message.new_unfollow( + host = request.config.host, + actor = request.actor.id, + follow = request.message + ) ) - await misc.request(actor.shared_inbox, message) - processors = { 'Announce': handle_relay, @@ -104,9 +145,16 @@ processors = { } -async def run_processor(request, actor, data, software): - if data.type not in processors: +async def run_processor(request): + if request.message.type not in processors: return - logging.verbose(f'New "{data.type}" from actor: {actor.id}') - return await processors[data.type](request, actor, data, software) + if request.instance and not request.instance.get('software'): + nodeinfo = await request.app.client.fetch_nodeinfo(request.instance['domain']) + + if nodeinfo: + request.instance['software'] = nodeinfo.sw_name + request.database.save() + + logging.verbose(f'New "{request.message.type}" from actor: {request.actor.id}') + return await processors[request.message.type](request) diff --git a/relay/views.py b/relay/views.py index 8d6be5e..9cea1ef 100644 --- a/relay/views.py +++ b/relay/views.py @@ -1,3 +1,5 @@ +import aputils +import asyncio import logging import subprocess import traceback @@ -5,8 +7,7 @@ import traceback from pathlib import Path from . import __version__, misc -from .http_debug import STATS -from .misc import DotDict, Message, Response, WKNodeinfo +from .misc import DotDict, Message, Response from .processors import run_processor @@ -33,10 +34,10 @@ def register_route(method, path): @register_route('GET', '/') async def home(request): - targets = '
'.join(request.app.database.hostnames) - note = request.app.config.note - count = len(request.app.database.hostnames) - host = request.app.config.host + targets = '
'.join(request.database.hostnames) + note = request.config.note + count = len(request.database.hostnames) + host = request.config.host text = f""" @@ -64,8 +65,8 @@ a:hover {{ color: #8AF; }} @register_route('GET', '/actor') async def actor(request): data = Message.new_actor( - host = request.app.config.host, - pubkey = request.app.database.pubkey + host = request.config.host, + pubkey = request.database.signer.pubkey ) return Response.new(data, ctype='activity') @@ -74,67 +75,74 @@ async def actor(request): @register_route('POST', '/inbox') @register_route('POST', '/actor') async def inbox(request): - config = request.app.config - database = request.app.database + config = request.config + database = request.database ## reject if missing signature header - try: - signature = DotDict.new_from_signature(request.headers['signature']) - - except KeyError: + if not request.signature: logging.verbose('Actor missing signature header') raise HTTPUnauthorized(body='missing signature') - ## read message try: - data = await request.json(loads=Message.new_from_json) + request['message'] = await request.json(loads=Message.new_from_json) + + ## reject if there is no message + if not request.message: + logging.verbose('empty message') + return Response.new_error(400, 'missing message', 'json') ## reject if there is no actor in the message - if 'actor' not in data: - logging.verbose('actor not in data') + if 'actor' not in request.message: + logging.verbose('actor not in message') return Response.new_error(400, 'no actor in message', 'json') except: + ## this code should hopefully never get called traceback.print_exc() logging.verbose('Failed to parse inbox message') return Response.new_error(400, 'failed to parse message', 'json') - actor = await misc.request(signature.keyid) - software = await misc.fetch_nodeinfo(actor.domain) + request['actor'] = await request.app.client.get(request.signature.keyid, sign_headers=True) ## reject if actor is empty - if not actor: - logging.verbose(f'Failed to fetch actor: {actor.id}') + if not request.actor: + ## ld signatures aren't handled atm, so just ignore it + if request['message'].type == 'Delete': + logging.verbose(f'Instance sent a delete which cannot be handled') + return Response.new(status=202) + + logging.verbose(f'Failed to fetch actor: {request.signature.keyid}') return Response.new_error(400, 'failed to fetch actor', 'json') + request['instance'] = request.database.get_inbox(request['actor'].inbox) + ## reject if the actor isn't whitelisted while the whiltelist is enabled - elif config.whitelist_enabled and not config.is_whitelisted(actor.domain): - logging.verbose(f'Rejected actor for not being in the whitelist: {actor.id}') + if config.whitelist_enabled and not config.is_whitelisted(request.actor.domain): + logging.verbose(f'Rejected actor for not being in the whitelist: {request.actor.id}') return Response.new_error(403, 'access denied', 'json') ## reject if actor is banned - if request.app['config'].is_banned(actor.domain): + if request.config.is_banned(request.actor.domain): logging.verbose(f'Ignored request from banned actor: {actor.id}') return Response.new_error(403, 'access denied', 'json') - ## reject if software used by actor is banned - if config.is_banned_software(software): - logging.verbose(f'Rejected actor for using specific software: {software}') - return Response.new_error(403, 'access denied', 'json') - ## reject if the signature is invalid - if not (await misc.validate_signature(actor, signature, request)): + try: + await request.actor.signer.validate_aiohttp_request(request) + + except aputils.SignatureValidationError as e: logging.verbose(f'signature validation failed for: {actor.id}') - return Response.new_error(401, 'signature check failed', 'json') + logging.debug(str(e)) + return Response.new_error(401, str(e), 'json') ## reject if activity type isn't 'Follow' and the actor isn't following - if data['type'] != 'Follow' and not database.get_inbox(actor.domain): - logging.verbose(f'Rejected actor for trying to post while not following: {actor.id}') + if request.message.type != 'Follow' and not database.get_inbox(request.actor.domain): + logging.verbose(f'Rejected actor for trying to post while not following: {request.actor.id}') return Response.new_error(401, 'access denied', 'json') - logging.debug(f">> payload {data}") + logging.debug(f">> payload {request.message.to_json(4)}") - await run_processor(request, actor, data, software) + asyncio.ensure_future(run_processor(request)) return Response.new(status=202) @@ -146,63 +154,38 @@ async def webfinger(request): except KeyError: return Response.new_error(400, 'missing \'resource\' query key', 'json') - if subject != f'acct:relay@{request.app.config.host}': + if subject != f'acct:relay@{request.config.host}': return Response.new_error(404, 'user not found', 'json') - data = { - 'subject': subject, - 'aliases': [request.app.config.actor], - 'links': [ - {'href': request.app.config.actor, 'rel': 'self', 'type': 'application/activity+json'}, - {'href': request.app.config.actor, 'rel': 'self', 'type': 'application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"'} - ] - } - - return Response.new(data, ctype='json') - - -@register_route('GET', '/nodeinfo/{version:\d.\d\.json}') -async def nodeinfo_2_0(request): - niversion = request.match_info['version'][:3] - data = { - 'openRegistrations': not request.app.config.whitelist_enabled, - 'protocols': ['activitypub'], - 'services': { - 'inbound': [], - 'outbound': [] - }, - 'software': { - 'name': 'activityrelay', - 'version': version - }, - 'usage': { - 'localPosts': 0, - 'users': { - 'total': 1 - } - }, - 'metadata': { - 'peers': request.app.database.hostnames - }, - 'version': niversion - } - - if version == '2.1': - data['software']['repository'] = 'https://git.pleroma.social/pleroma/relay' - - return Response.new(data, ctype='json') - - -@register_route('GET', '/.well-known/nodeinfo') -async def nodeinfo_wellknown(request): - data = WKNodeinfo.new( - v20 = f'https://{request.app.config.host}/nodeinfo/2.0.json', - v21 = f'https://{request.app.config.host}/nodeinfo/2.1.json' + data = aputils.Webfinger.new( + handle = 'relay', + domain = request.config.host, + actor = request.config.actor ) return Response.new(data, ctype='json') -@register_route('GET', '/stats') -async def stats(request): - return Response.new(STATS, ctype='json') +@register_route('GET', '/nodeinfo/{version:\d.\d\.json}') +async def nodeinfo(request): + niversion = request.match_info['version'][:3] + + data = dict( + name = 'activityrelay', + version = version, + protocols = ['activitypub'], + open_regs = not request.config.whitelist_enabled, + users = 1, + metadata = {'peers': request.database.hostnames} + ) + + if niversion == '2.1': + data['repo'] = 'https://git.pleroma.social/pleroma/relay' + + return Response.new(aputils.Nodeinfo.new(**data), ctype='json') + + +@register_route('GET', '/.well-known/nodeinfo') +async def nodeinfo_wellknown(request): + data = aputils.WellKnownNodeinfo.new_template(request.config.host) + return Response.new(data, ctype='json') diff --git a/requirements.txt b/requirements.txt index 9c558e3..9199741 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,5 @@ -. +aiohttp>=3.8.0 +aputils@https://git.barkshark.xyz/barkshark/aputils/archive/0.1.3.tar.gz +cachetools>=5.2.0 +click>=8.1.2 +pyyaml>=6.0 diff --git a/setup.cfg b/setup.cfg index 2345151..bb97663 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = relay -version = 0.2.3 +version = attr: relay.__version__ description = Generic LitePub relay (works with all LitePub consumers and Mastodon) long_description = file: README.md long_description_content_type = text/markdown; charset=UTF-8 @@ -22,13 +22,8 @@ project_urls = [options] zip_safe = False packages = find: -install_requires = - aiohttp >= 3.8.0 - cachetools >= 5.0.0 - click >= 8.1.2 - pycryptodome >= 3.14.1 - PyYAML >= 5.0.0 -python_requires = >=3.6 +install_requires = file: requirements.txt +python_requires = >=3.7 [options.extras_require] dev =