From 8ca198b611d867a7210752d4ef4d454a3e2bf071 Mon Sep 17 00:00:00 2001 From: Izalia Mae Date: Sat, 5 Nov 2022 20:07:44 -0400 Subject: [PATCH 01/34] simplify misc.request --- relay/misc.py | 33 ++++++++++++++------------------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/relay/misc.py b/relay/misc.py index 50a575b..139192d 100644 --- a/relay/misc.py +++ b/relay/misc.py @@ -222,16 +222,14 @@ async def request(uri, data=None, force=False, sign_headers=True, activity=True) url = urlparse(uri) method = 'POST' if data else 'GET' - headers = {'User-Agent': 'ActivityRelay'} - mimetype = 'application/activity+json' if activity else 'application/json' + action = data.get('type') if data else None + headers = { + 'Accept': 'application/activity+json, application/json;q=0.9', + 'User-Agent': 'ActivityRelay', + } - ## Set the content type for a POST - if data and 'Content-Type' not in headers: - headers['Content-Type'] = mimetype - - ## Set the accepted content type for a GET - elif not data and 'Accept' not in headers: - headers['Accept'] = mimetype + if data: + headers['Content-Type'] = 'application/activity+json' if activity else 'application/json' if sign_headers: signing_headers = { @@ -243,7 +241,6 @@ async def request(uri, data=None, force=False, sign_headers=True, activity=True) if data: assert isinstance(data, dict) - action = data.get('type') data = json.dumps(data) signing_headers.update({ 'Digest': f'SHA-256={generate_body_digest(data)}', @@ -258,26 +255,24 @@ async def request(uri, data=None, force=False, sign_headers=True, activity=True) headers.update(signing_headers) try: - # json_serializer=DotDict maybe? async with ClientSession(trace_configs=http_debug()) as session, app['semaphore']: async with session.request(method, uri, headers=headers, data=data) as resp: ## aiohttp has been known to leak if the response hasn't been read, ## so we're just gonna read the request no matter what - resp_data = await resp.read() - resp_payload = json.loads(resp_data.decode('utf-8')) + resp_data = await resp.json() if resp.status not in [200, 202]: - if not data: - logging.verbose(f'Received error when requesting {uri}: {resp.status} {resp_payload}') + if not resp_data: + logging.verbose(f'Received error when requesting {uri}: {resp.status} {resp_data}') return - logging.verbose(f'Received error when sending {action} to {uri}: {resp.status} {resp_payload}') + logging.verbose(f'Received error when sending {action} to {uri}: {resp.status} {resp_data}') return - logging.debug(f'{uri} >> resp {resp_payload}') + logging.debug(f'{uri} >> resp {resp_data}') - app['cache'].json[uri] = resp_payload - return resp_payload + app['cache'].json[uri] = resp_data + return resp_data except JSONDecodeError: return From c0d55cebb0fc4062f3705d73fd1b1e5f5bdee884 Mon Sep 17 00:00:00 2001 From: Izalia Mae Date: Sat, 5 Nov 2022 20:10:01 -0400 Subject: [PATCH 02/34] cache activity id for forwards --- relay/processors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay/processors.py b/relay/processors.py index 2f8b351..e35e14b 100644 --- a/relay/processors.py +++ b/relay/processors.py @@ -38,7 +38,7 @@ async def handle_relay(actor, data, request): async def handle_forward(actor, data, request): cache = app['cache'].objects - object_id = misc.distill_object_id(data) + object_id = data['id'] if object_id in cache: logging.verbose(f'already forwarded {object_id}') From 4d121adaa2484a0138a22df69039ed55cd6016fe Mon Sep 17 00:00:00 2001 From: Izalia Mae Date: Sat, 5 Nov 2022 20:15:40 -0400 Subject: [PATCH 03/34] forward all non-Follow undos --- relay/processors.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/relay/processors.py b/relay/processors.py index e35e14b..9377b65 100644 --- a/relay/processors.py +++ b/relay/processors.py @@ -87,13 +87,9 @@ async def handle_follow(actor, data, request): async def handle_undo(actor, data, request): - ## If the activity being undone is an Announce, forward it insteead - if data['object']['type'] == 'Announce': - await handle_forward(actor, data, request) - return - - elif data['object']['type'] != 'Follow': - return + ## If the object is not a Follow, forward it + if data['object']['type'] != 'Follow': + return await handle_forward(actor, data, request) database = app['database'] inbox = database.get_inbox(actor['id']) From dcb7980c5063d73ea9b7799715f089ee14455011 Mon Sep 17 00:00:00 2001 From: Izalia Mae Date: Sat, 5 Nov 2022 22:15:37 -0400 Subject: [PATCH 04/34] prevent old unfollows from booting instances --- relay/database.py | 118 ++++++++++++++++++++++++++++---------------- relay/manage.py | 50 +++++++++---------- relay/misc.py | 6 +++ relay/processors.py | 13 ++--- relay/views.py | 4 +- 5 files changed, 115 insertions(+), 76 deletions(-) diff --git a/relay/database.py b/relay/database.py index 6e71f56..bc08414 100644 --- a/relay/database.py +++ b/relay/database.py @@ -6,10 +6,15 @@ from Crypto.PublicKey import RSA from urllib.parse import urlparse -class RelayDatabase: +class RelayDatabase(dict): def __init__(self, config): + dict.__init__(self, { + 'relay-list': {}, + 'private-key': None, + 'version': 1 + }) + self.config = config - self.data = None self.PRIVKEY = None @@ -25,26 +30,23 @@ class RelayDatabase: @property def privkey(self): - try: - return self.data['private-key'] - - except KeyError: - return False + return self['private-key'] @property def hostnames(self): - return [urlparse(inbox).hostname for inbox in self.inboxes] + return tuple(self['relay-list'].keys()) @property def inboxes(self): - return self.data.get('relay-list', []) + return tuple(data['inbox'] for data in self['relay-list'].values()) + return self['relay-list'] def generate_key(self): self.PRIVKEY = RSA.generate(4096) - self.data['private-key'] = self.PRIVKEY.exportKey('PEM').decode('utf-8') + self['private-key'] = self.PRIVKEY.exportKey('PEM').decode('utf-8') def load(self): @@ -52,14 +54,31 @@ class RelayDatabase: try: with self.config.db.open() as fd: - self.data = json.load(fd) + data = json.load(fd) - key = self.data.pop('actorKeys', None) + self['version'] = data.get('version', None) + self['private-key'] = data.get('private-key') - if key: - self.data['private-key'] = key.get('privateKey') + if self['version'] == None: + self['version'] = 1 + + if 'actorKeys' in data: + self['private-key'] = data['actorKeys']['privateKey'] + + for item in data.get('relay-list', []): + domain = urlparse(item).hostname + self['relay-list'][domain] = { + 'inbox': item, + 'followid': None + } + + else: + self['relay-list'] = data.get('relay-list', {}) + + for domain in self['relay-list'].keys(): + if self.config.is_banned(domain) or (self.config.whitelist_enabled and not self.config.is_whitelisted(domain)): + self.del_inbox(domain) - self.data.pop('actors', None) new_db = False except FileNotFoundError: @@ -69,14 +88,6 @@ class RelayDatabase: if self.config.db.stat().st_size > 0: raise e from None - if not self.data: - logging.info('No database was found. Making a new one.') - self.data = {} - - for inbox in self.inboxes: - if self.config.is_banned(inbox) or (self.config.whitelist_enabled and not self.config.is_whitelisted(inbox)): - self.del_inbox(inbox) - if not self.privkey: logging.info("No actor keys present, generating 4096-bit RSA keypair.") self.generate_key() @@ -90,34 +101,57 @@ class RelayDatabase: def save(self): with self.config.db.open('w') as fd: - data = { - 'relay-list': self.inboxes, - 'private-key': self.privkey - } - - json.dump(data, fd, indent=4) + json.dump(self, fd, indent=4) - def get_inbox(self, domain): + def get_inbox(self, domain, fail=False): if domain.startswith('http'): domain = urlparse(domain).hostname - for inbox in self.inboxes: - if domain == urlparse(inbox).hostname: - return inbox + if domain not in self['relay-list']: + if fail: + raise KeyError(domain) + + return + + return self['relay-list'][domain] - def add_inbox(self, inbox): - assert inbox.startswith('https') - assert not self.get_inbox(inbox) + def add_inbox(self, inbox, followid=None, fail=False): + assert inbox.startswith('https'), 'Inbox must be a url' + domain = urlparse(inbox).hostname - self.data['relay-list'].append(inbox) + if self.get_inbox(domain): + if fail: + raise KeyError(domain) + + return False + + self['relay-list'][domain] = { + 'domain': domain, + 'inbox': inbox, + 'followid': followid + } + + logging.verbose(f'Added inbox to database: {inbox}') + return self['relay-list'][domain] - def del_inbox(self, inbox_url): - inbox = self.get_inbox(inbox_url) + def del_inbox(self, domain, followid=None, fail=False): + data = self.get_inbox(domain, fail=True) - if not inbox: - raise KeyError(inbox_url) + if not data['followid'] or not followid or data['followid'] == followid: + del self['relay-list'][data['domain']] + logging.verbose(f'Removed inbox from database: {data["inbox"]}') + return True - self.data['relay-list'].remove(inbox) + if fail: + raise ValueError('Follow IDs do not match') + + logging.debug(f'Follow ID does not match: db = {data["followid"]}, object = {followid}') + return False + + + def set_followid(self, domain, followid): + data = self.get_inbox(domain, fail=True) + data['followid'] = followid diff --git a/relay/manage.py b/relay/manage.py index 4aae7c7..fb76236 100644 --- a/relay/manage.py +++ b/relay/manage.py @@ -100,14 +100,12 @@ def cli_inbox_unfollow(actor): if not actor.startswith('http'): actor = f'https://{actor}/actor' - if not database.get_inbox(actor): - return click.echo(f'Error: Not following actor: {actor}') + if database.del_inbox(actor): + database.save() + run_in_loop(misc.unfollow_remote_actor, actor) + return click.echo(f'Sent unfollow message to: {actor}') - database.del_inbox(actor) - database.save() - - run_in_loop(misc.unfollow_remote_actor, actor) - click.echo(f'Sent unfollow message to: {actor}') + return click.echo(f'Error: Not following actor: {actor}') @cli_inbox.command('add') @@ -121,17 +119,14 @@ def cli_inbox_add(inbox): if not inbox.startswith('http'): inbox = f'https://{inbox}/inbox' - if database.get_inbox(inbox): - click.echo(f'Error: Inbox already in database: {inbox}') - return - if config.is_banned(inbox): - click.echo(f'Error: Refusing to add banned inbox: {inbox}') - return + return click.echo(f'Error: Refusing to add banned inbox: {inbox}') - database.add_inbox(inbox) - database.save() - click.echo(f'Added inbox to the database: {inbox}') + if database.add_inbox(inbox): + database.save() + return click.echo(f'Added inbox to the database: {inbox}') + + click.echo(f'Error: Inbox already in database: {inbox}') @cli_inbox.command('remove') @@ -140,14 +135,17 @@ def cli_inbox_remove(inbox): 'Remove an inbox from the database' database = app['database'] - dbinbox = database.get_inbox(inbox) - if not dbinbox: + try: + dbinbox = database.get_inbox(inbox, fail=True) + + except KeyError: click.echo(f'Error: Inbox does not exist: {inbox}') return - database.del_inbox(dbinbox) + database.del_inbox(dbinbox['domain']) database.save() + click.echo(f'Removed inbox from the database: {inbox}') @@ -174,13 +172,14 @@ def cli_instance_ban(target): config = app['config'] database = app['database'] - inbox = database.get_inbox(target) + + if target.startswith('http'): + target = urlparse(target).hostname if config.ban_instance(target): config.save() - if inbox: - database.del_inbox(inbox) + if database.del_inbox(target): database.save() click.echo(f'Banned instance: {target}') @@ -321,16 +320,15 @@ def cli_whitelist_remove(instance): config = app['config'] database = app['database'] - inbox = database.get_inbox(instance) if not config.del_whitelist(instance): return click.echo(f'Instance not in the whitelist: {instance}') config.save() - if inbox and config.whitelist_enabled: - database.del_inbox(inbox) - database.save() + if config.whitelist_enabled: + if database.del_inbox(inbox): + database.save() click.echo(f'Removed instance from the whitelist: {instance}') diff --git a/relay/misc.py b/relay/misc.py index 139192d..9161f53 100644 --- a/relay/misc.py +++ b/relay/misc.py @@ -255,6 +255,12 @@ async def request(uri, data=None, force=False, sign_headers=True, activity=True) headers.update(signing_headers) try: + if data: + logging.verbose(f'Sending "{action}" to inbox: {uri}') + + else: + logging.verbose(f'Sending GET request to url: {uri}') + async with ClientSession(trace_configs=http_debug()) as session, app['semaphore']: async with session.request(method, uri, headers=headers, data=data) as resp: ## aiohttp has been known to leak if the response hasn't been read, diff --git a/relay/processors.py b/relay/processors.py index 9377b65..b53ace6 100644 --- a/relay/processors.py +++ b/relay/processors.py @@ -60,11 +60,13 @@ async def handle_follow(actor, data, request): database = app['database'] inbox = misc.get_actor_inbox(actor) + dbinbox = database.get_inbox(inbox) - if inbox not in database.inboxes: - database.add_inbox(inbox) + if not database.add_inbox(inbox, data['id']): + database.set_followid(inbox, data['id']) database.save() - asyncio.ensure_future(misc.follow_remote_actor(actor['id'])) + + asyncio.ensure_future(misc.follow_remote_actor(actor['id'])) message = { "@context": "https://www.w3.org/ns/activitystreams", @@ -92,12 +94,11 @@ async def handle_undo(actor, data, request): return await handle_forward(actor, data, request) database = app['database'] - inbox = database.get_inbox(actor['id']) + objectid = misc.distill_object_id(data) - if not inbox: + if not database.del_inbox(actor['id'], objectid): return - database.del_inbox(inbox) database.save() await misc.unfollow_remote_actor(actor['id']) diff --git a/relay/views.py b/relay/views.py index 6eac9d7..a5da8ff 100644 --- a/relay/views.py +++ b/relay/views.py @@ -37,7 +37,7 @@ a:hover {{ color: #8AF; }}

You may subscribe to this relay with the address: https://{host}/actor

To host your own relay, you may download the code at this address: https://git.pleroma.social/pleroma/relay


List of {count} registered instances:
{targets}

-""".format(host=request.host, note=app['config'].note, targets=targets, count=len(app['database'].inboxes)) +""".format(host=request.host, note=app['config'].note, targets=targets, count=len(app['database'].hostnames)) return Response( status = 200, @@ -89,11 +89,11 @@ async def inbox(request): actor_id = data['actor'] actor_domain = urlparse(actor_id).hostname + ## reject if there is no actor in the message except KeyError: logging.verbose('actor not in data') raise HTTPUnauthorized(body='no actor in message') - ## reject if there is no actor in the message except: traceback.print_exc() logging.verbose('Failed to parse inbox message') From f713f543069469d095a645149e1b4540878771ae Mon Sep 17 00:00:00 2001 From: Izalia Mae Date: Sun, 6 Nov 2022 01:11:36 -0500 Subject: [PATCH 05/34] announce forwarded messages --- relay/processors.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/relay/processors.py b/relay/processors.py index b53ace6..0ca2358 100644 --- a/relay/processors.py +++ b/relay/processors.py @@ -44,15 +44,26 @@ async def handle_forward(actor, data, request): logging.verbose(f'already forwarded {object_id}') return + activity_id = f"https://{request.host}/activities/{uuid4()}" + + message = { + "@context": "https://www.w3.org/ns/activitystreams", + "type": "Announce", + "to": [f"https://{request.host}/followers"], + "actor": f"https://{request.host}/actor", + "object": data, + "id": activity_id + } + logging.verbose(f'Forwarding post from {actor["id"]}') logging.debug(f'>> Relay {data}') inboxes = misc.distill_inboxes(actor, object_id) - futures = [misc.request(inbox, data=data) for inbox in inboxes] + futures = [misc.request(inbox, data=message) for inbox in inboxes] asyncio.ensure_future(asyncio.gather(*futures)) - cache[object_id] = object_id + cache[object_id] = activity_id async def handle_follow(actor, data, request): From 3b85e2c2f2141c8cafdc4fdbebbb049baa078e12 Mon Sep 17 00:00:00 2001 From: Izalia Mae Date: Sun, 6 Nov 2022 01:11:54 -0500 Subject: [PATCH 06/34] move DotDict to misc --- relay/config.py | 41 ++--------------------------------------- relay/misc.py | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 39 deletions(-) diff --git a/relay/config.py b/relay/config.py index 951b6f3..b363580 100644 --- a/relay/config.py +++ b/relay/config.py @@ -4,6 +4,8 @@ import yaml from pathlib import Path from urllib.parse import urlparse +from .misc import DotDict + relay_software_names = [ 'activityrelay', @@ -13,45 +15,6 @@ relay_software_names = [ ] -class DotDict(dict): - def __getattr__(self, k): - try: - return self[k] - - except KeyError: - raise AttributeError(f'{self.__class__.__name__} object has no attribute {k}') from None - - - def __setattr__(self, k, v): - try: - if k in self._ignore_keys: - super().__setattr__(k, v) - - except AttributeError: - pass - - if k.startswith('_'): - super().__setattr__(k, v) - - else: - self[k] = v - - - def __setitem__(self, k, v): - if type(v) == dict: - v = DotDict(v) - - super().__setitem__(k, v) - - - def __delattr__(self, k): - try: - dict.__delitem__(self, k) - - except KeyError: - raise AttributeError(f'{self.__class__.__name__} object has no attribute {k}') from None - - class RelayConfig(DotDict): apkeys = { 'host', diff --git a/relay/misc.py b/relay/misc.py index 9161f53..7323633 100644 --- a/relay/misc.py +++ b/relay/misc.py @@ -318,3 +318,44 @@ async def validate_signature(actor, http_request): logging.debug(f'validates? {result}') return result + + +class DotDict(dict): + def __getattr__(self, k): + try: + return self[k] + + except KeyError: + raise AttributeError(f'{self.__class__.__name__} object has no attribute {k}') from None + + + def __setattr__(self, k, v): + if k.startswith('_'): + super().__setattr__(k, v) + + else: + self[k] = v + + + def __setitem__(self, k, v): + if type(v) == dict: + v = DotDict(v) + + super().__setitem__(k, v) + + + def __delattr__(self, k): + try: + dict.__delitem__(self, k) + + except KeyError: + raise AttributeError(f'{self.__class__.__name__} object has no attribute {k}') from None + + + @classmethod + def new_from_json(cls, data): + return cls(json.loads(data)) + + + def to_json(self, indent=None): + return json.dumps(self, indent=indent) From c66f9d34b3b36e841987e91e0d5fec0cb28c7fa7 Mon Sep 17 00:00:00 2001 From: Izalia Mae Date: Mon, 7 Nov 2022 05:30:13 -0500 Subject: [PATCH 07/34] create Message class --- relay/manage.py | 4 +- relay/misc.py | 183 +++++++++++++++++++++++++++++++++++--------- relay/processors.py | 130 +++++++++++++++---------------- relay/views.py | 58 ++++++-------- 4 files changed, 229 insertions(+), 146 deletions(-) diff --git a/relay/manage.py b/relay/manage.py index fb76236..7c64f2b 100644 --- a/relay/manage.py +++ b/relay/manage.py @@ -81,9 +81,7 @@ def cli_inbox_follow(actor): if not actor_data: return click.echo(f'Error: Failed to fetch actor: {actor}') - inbox = misc.get_actor_inbox(actor_data) - - database.add_inbox(inbox) + database.add_inbox(actor_data.shared_inbox) database.save() run_in_loop(misc.follow_remote_actor, actor) diff --git a/relay/misc.py b/relay/misc.py index 7323633..3185f51 100644 --- a/relay/misc.py +++ b/relay/misc.py @@ -4,6 +4,7 @@ import json import logging import socket import traceback +import uuid from Crypto.Hash import SHA, SHA256, SHA512 from Crypto.PublicKey import RSA @@ -57,27 +58,12 @@ def create_signature_header(headers): return ','.join(chunks) -def distill_object_id(activity): - logging.debug(f'>> determining object ID for {activity["object"]}') - - try: - return activity['object']['id'] - - except TypeError: - return activity['object'] - - def distill_inboxes(actor, object_id): database = app['database'] - origin_hostname = urlparse(object_id).hostname - actor_inbox = get_actor_inbox(actor) - targets = [] for inbox in database.inboxes: - if inbox != actor_inbox or urlparse(inbox).hostname != origin_hostname: - targets.append(inbox) - - return targets + if inbox != actor.shared_inbox or urlparse(inbox).hostname != urlparse(object_id).hostname: + yield inbox def generate_body_digest(body): @@ -93,10 +79,6 @@ def generate_body_digest(body): return bodyhash -def get_actor_inbox(actor): - return actor.get('endpoints', {}).get('sharedInbox', actor['inbox']) - - def sign_signing_string(sigstring, key): pkcs = PKCS1_v1_5.new(key) h = SHA256.new() @@ -162,14 +144,11 @@ async def follow_remote_actor(actor_uri): config = app['config'] actor = await request(actor_uri) - inbox = get_actor_inbox(actor) if not actor: logging.error(f'failed to fetch actor at: {actor_uri}') return - logging.verbose(f'sending follow request: {actor_uri}') - message = { "@context": "https://www.w3.org/ns/activitystreams", "type": "Follow", @@ -179,7 +158,8 @@ async def follow_remote_actor(actor_uri): "actor": f"https://{config.host}/actor" } - await request(inbox, message) + logging.verbose(f'sending follow request: {actor_uri}') + await request(actor.shared_inbox, message) async def unfollow_remote_actor(actor_uri): @@ -191,9 +171,6 @@ async def unfollow_remote_actor(actor_uri): logging.error(f'failed to fetch actor: {actor_uri}') return - inbox = get_actor_inbox(actor) - logging.verbose(f'sending unfollow request to inbox: {inbox}') - message = { "@context": "https://www.w3.org/ns/activitystreams", "type": "Undo", @@ -208,7 +185,8 @@ async def unfollow_remote_actor(actor_uri): "actor": f"https://{config.host}/actor" } - await request(inbox, message) + logging.verbose(f'sending unfollow request to inbox: {actor.shared_inbox}') + await request(actor.shared_inbox, message) async def request(uri, data=None, force=False, sign_headers=True, activity=True): @@ -265,16 +243,28 @@ async def request(uri, data=None, force=False, sign_headers=True, activity=True) async with session.request(method, uri, headers=headers, data=data) as resp: ## aiohttp has been known to leak if the response hasn't been read, ## so we're just gonna read the request no matter what - resp_data = await resp.json() + resp_data = await resp.read() - if resp.status not in [200, 202]: - if not resp_data: - logging.verbose(f'Received error when requesting {uri}: {resp.status} {resp_data}') - return - - logging.verbose(f'Received error when sending {action} to {uri}: {resp.status} {resp_data}') + ## Not expecting a response, so just return + if resp.status == 202: return + elif resp.status != 200: + if not resp_data: + return logging.verbose(f'Received error when requesting {uri}: {resp.status} {resp_data}') + + return logging.verbose(f'Received error when sending {action} to {uri}: {resp.status} {resp_data}') + + if resp.content_type == 'application/activity+json': + resp_data = await resp.json(loads=Message.new_from_json) + + elif resp.content_type == 'application/json': + resp_data = await resp.json(loads=DotDict.new_from_json) + + else: + logging.verbose(f'Invalid Content-Type for "{url}": {resp.content_type}') + return logging.debug(f'Response: {resp_data}') + logging.debug(f'{uri} >> resp {resp_data}') app['cache'].json[uri] = resp_data @@ -354,8 +344,127 @@ class DotDict(dict): @classmethod def new_from_json(cls, data): - return cls(json.loads(data)) + if not data: + raise JSONDecodeError('Empty body', data, 1) + + try: + return cls(json.loads(data)) + + except ValueError: + raise JSONDecodeError('Invalid body', data, 1) def to_json(self, indent=None): return json.dumps(self, indent=indent) + + +class Message(DotDict): + @classmethod + def new_actor(cls, host, pubkey, description=None): + return cls({ + '@context': 'https://www.w3.org/ns/activitystreams', + 'id': f'https://{host}/actor', + 'type': 'Application', + 'preferredUsername': 'relay', + 'name': 'ActivityRelay', + 'summary': description or 'ActivityRelay bot', + 'followers': f'https://{host}/followers', + 'following': f'https://{host}/following', + 'inbox': f'https://{host}/inbox', + 'url': f'https://{host}/inbox', + 'endpoints': { + 'sharedInbox': f'https://{host}/inbox' + }, + 'publicKey': { + 'id': f'https://{host}/actor#main-key', + 'owner': f'https://{host}/actor', + 'publicKeyPem': pubkey + } + }) + + + @classmethod + def new_announce(cls, host, object): + return cls({ + '@context': 'https://www.w3.org/ns/activitystreams', + 'id': f'https://{host}/activities/{uuid.uuid4()}', + 'type': 'Announce', + 'to': [f'https://{host}/followers'], + 'actor': f'https://{host}/actor', + 'object': object + }) + + + @classmethod + def new_follow(cls, host, actor): + return cls({ + '@context': 'https://www.w3.org/ns/activitystreams', + 'type': 'Follow', + 'to': [actor], + 'object': actor, + 'id': f'https://{host}/activities/{uuid.uuid4()}', + 'actor': f'https://{host}/actor' + }) + + + @classmethod + def new_unfollow(cls, host, actor, follow): + return cls({ + '@context': 'https://www.w3.org/ns/activitystreams', + 'id': f'https://{host}/activities/{uuid.uuid4()}', + 'type': 'Undo', + 'to': [actor], + 'actor': f'https://{host}/actor', + 'object': follow + }) + + + @classmethod + def new_response(cls, host, actor, followid, accept): + return cls({ + '@context': 'https://www.w3.org/ns/activitystreams', + 'id': f'https://{host}/activities/{uuid.uuid4()}', + 'type': 'Accept' if accept else 'Reject', + 'to': [actor], + 'actor': f'https://{host}/actor', + 'object': { + 'id': followid, + 'type': 'Follow', + 'object': f'https://{host}/actor', + 'actor': actor + } + }) + + + # misc properties + @property + def domain(self): + return urlparse(self.id).hostname + + + # actor properties + @property + def pubkey(self): + return self.publicKey.publicKeyPem + + + @property + def shared_inbox(self): + return self.get('endpoints', {}).get('sharedInbox', self.inbox) + + + # activity properties + @property + def actorid(self): + if isinstance(self.actor, dict): + return self.actor.id + + return self.actor + + + @property + def objectid(self): + if isinstance(self.object, dict): + return self.object.id + + return self.object diff --git a/relay/processors.py b/relay/processors.py index 0ca2358..e38902d 100644 --- a/relay/processors.py +++ b/relay/processors.py @@ -6,113 +6,103 @@ from uuid import uuid4 from . import app, misc -async def handle_relay(actor, data, request): +async def handle_relay(request, actor, data, software): cache = app['cache'].objects - object_id = misc.distill_object_id(data) + config = app['config'] - if object_id in cache: - logging.verbose(f'already relayed {object_id} as {cache[object_id]}') + if data.objectid in cache: + logging.verbose(f'already relayed {data.objectid} as {cache[data.objectid]}') return - logging.verbose(f'Relaying post from {actor["id"]}') + logging.verbose(f'Relaying post from {data.actorid}') - activity_id = f"https://{request.host}/activities/{uuid4()}" - - message = { - "@context": "https://www.w3.org/ns/activitystreams", - "type": "Announce", - "to": [f"https://{request.host}/followers"], - "actor": f"https://{request.host}/actor", - "object": object_id, - "id": activity_id - } + message = misc.Message.new_announce( + host = config.host, + object = data.objectid + ) logging.debug(f'>> relay: {message}') - inboxes = misc.distill_inboxes(actor, object_id) + inboxes = misc.distill_inboxes(actor, data.objectid) futures = [misc.request(inbox, data=message) for inbox in inboxes] asyncio.ensure_future(asyncio.gather(*futures)) - cache[object_id] = activity_id + cache[data.objectid] = message.id -async def handle_forward(actor, data, request): +async def handle_forward(request, actor, data, software): cache = app['cache'].objects - object_id = data['id'] + config = app['config'] - if object_id in cache: - logging.verbose(f'already forwarded {object_id}') + if data.id in cache: + logging.verbose(f'already forwarded {data.id}') return - activity_id = f"https://{request.host}/activities/{uuid4()}" + message = misc.Message.new_announce( + host = config.host, + object = data + ) - message = { - "@context": "https://www.w3.org/ns/activitystreams", - "type": "Announce", - "to": [f"https://{request.host}/followers"], - "actor": f"https://{request.host}/actor", - "object": data, - "id": activity_id - } - - logging.verbose(f'Forwarding post from {actor["id"]}') + logging.verbose(f'Forwarding post from {actor.id}') logging.debug(f'>> Relay {data}') - inboxes = misc.distill_inboxes(actor, object_id) - + inboxes = misc.distill_inboxes(actor, data.id) futures = [misc.request(inbox, data=message) for inbox in inboxes] + asyncio.ensure_future(asyncio.gather(*futures)) - - cache[object_id] = activity_id + cache[data.id] = message.id -async def handle_follow(actor, data, request): +async def handle_follow(request, actor, data, software): config = app['config'] database = app['database'] - inbox = misc.get_actor_inbox(actor) - dbinbox = database.get_inbox(inbox) + if database.add_inbox(inbox, data.id): + database.set_followid(actor.id, data.id) - if not database.add_inbox(inbox, data['id']): - database.set_followid(inbox, data['id']) - database.save() + database.save() - asyncio.ensure_future(misc.follow_remote_actor(actor['id'])) + await misc.request( + actor.shared_inbox, + misc.Message.new_response( + host = config.host, + actor = actor.id, + followid = data.id, + accept = True + ) + ) - message = { - "@context": "https://www.w3.org/ns/activitystreams", - "type": "Accept", - "to": [actor["id"]], - "actor": config.actor, - - # this is wrong per litepub, but mastodon < 2.4 is not compliant with that profile. - "object": { - "type": "Follow", - "id": data["id"], - "object": config.actor, - "actor": actor["id"] - }, - - "id": f"https://{request.host}/activities/{uuid4()}", - } - - asyncio.ensure_future(misc.request(inbox, message)) + # Are Akkoma and Pleroma the only two that expect a follow back? + # Ignoring only Mastodon for now + if software != 'mastodon': + misc.request( + actor.shared_inbox, + misc.Message.new_follow( + host = config.host, + actor = actor.id + ) + ) -async def handle_undo(actor, data, request): +async def handle_undo(request, actor, data, software): ## If the object is not a Follow, forward it if data['object']['type'] != 'Follow': - return await handle_forward(actor, data, request) + return await handle_forward(request, actor, data, software) database = app['database'] - objectid = misc.distill_object_id(data) - if not database.del_inbox(actor['id'], objectid): + if not database.del_inbox(actor.domain, data.id): return database.save() - await misc.unfollow_remote_actor(actor['id']) + message = misc.Message.new_unfollow( + host = config.host, + actor = actor.id, + follow = data + ) + + await misc.request(actor.shared_inbox, message) processors = { @@ -125,9 +115,9 @@ processors = { } -async def run_processor(request, data, actor): - if data['type'] not in processors: +async def run_processor(request, actor, data, software): + if data.type not in processors: return - logging.verbose(f'New activity from actor: {actor["id"]} {data["type"]}') - return await processors[data['type']](actor, data, request) + logging.verbose(f'New "{data.type}" from actor: {actor.id}') + return await processors[data.type](request, actor, data, software) diff --git a/relay/views.py b/relay/views.py index a5da8ff..b493378 100644 --- a/relay/views.py +++ b/relay/views.py @@ -3,10 +3,10 @@ import subprocess import traceback from aiohttp.web import HTTPForbidden, HTTPUnauthorized, Response, json_response -from urllib.parse import urlparse from . import __version__, app, misc from .http_debug import STATS +from .misc import Message from .processors import run_processor @@ -48,28 +48,13 @@ a:hover {{ color: #8AF; }} async def actor(request): + config = app['config'] database = app['database'] - data = { - "@context": "https://www.w3.org/ns/activitystreams", - "endpoints": { - "sharedInbox": f"https://{request.host}/inbox" - }, - "followers": f"https://{request.host}/followers", - "following": f"https://{request.host}/following", - "inbox": f"https://{request.host}/inbox", - "name": "ActivityRelay", - "type": "Application", - "id": f"https://{request.host}/actor", - "publicKey": { - "id": f"https://{request.host}/actor#main-key", - "owner": f"https://{request.host}/actor", - "publicKeyPem": database.pubkey - }, - "summary": "ActivityRelay bot", - "preferredUsername": "relay", - "url": f"https://{request.host}/actor" - } + data = Message.new_actor( + host = config.host, + pubkey = database.pubkey + ) return json_response(data, content_type='application/activity+json') @@ -85,9 +70,10 @@ async def inbox(request): ## read message and get actor id and domain try: - data = await request.json() - actor_id = data['actor'] - actor_domain = urlparse(actor_id).hostname + data = await request.json(loads=Message.new_from_json) + + if 'actor' not in data: + raise KeyError('actor') ## reject if there is no actor in the message except KeyError: @@ -99,44 +85,44 @@ async def inbox(request): logging.verbose('Failed to parse inbox message') raise HTTPUnauthorized(body='failed to parse message') - actor = await misc.request(actor_id) + actor = await misc.request(data.actorid) ## reject if actor is empty if not actor: - logging.verbose(f'Failed to fetch actor: {actor_id}') + logging.verbose(f'Failed to fetch actor: {data.actorid}') raise HTTPUnauthorized('failed to fetch actor') ## reject if the actor isn't whitelisted while the whiltelist is enabled - elif config.whitelist_enabled and not config.is_whitelisted(actor_id): - logging.verbose(f'Rejected actor for not being in the whitelist: {actor_id}') + elif config.whitelist_enabled and not config.is_whitelisted(data.domain): + logging.verbose(f'Rejected actor for not being in the whitelist: {data.actorid}') raise HTTPForbidden(body='access denied') ## reject if actor is banned - if app['config'].is_banned(actor_id): - logging.verbose(f'Ignored request from banned actor: {actor_id}') + if app['config'].is_banned(data.domain): + logging.verbose(f'Ignored request from banned actor: {data.actorid}') raise HTTPForbidden(body='access denied') ## reject if software used by actor is banned if len(config.blocked_software): - software = await misc.fetch_nodeinfo(actor_domain) + software = await misc.fetch_nodeinfo(data.domain) if config.is_banned_software(software): logging.verbose(f'Rejected actor for using specific software: {software}') raise HTTPForbidden(body='access denied') ## reject if the signature is invalid - if not (await misc.validate_signature(actor_id, request)): - logging.verbose(f'signature validation failed for: {actor_id}') + if not (await misc.validate_signature(data.actorid, request)): + logging.verbose(f'signature validation failed for: {data.actorid}') raise HTTPUnauthorized(body='signature check failed, signature did not match key') ## reject if activity type isn't 'Follow' and the actor isn't following - if data['type'] != 'Follow' and not database.get_inbox(actor_domain): - logging.verbose(f'Rejected actor for trying to post while not following: {actor_id}') + if data['type'] != 'Follow' and not database.get_inbox(data.domain): + logging.verbose(f'Rejected actor for trying to post while not following: {data.actorid}') raise HTTPUnauthorized(body='access denied') logging.debug(f">> payload {data}") - await run_processor(request, data, actor) + await run_processor(request, actor, data, software) return Response(body=b'{}', content_type='application/activity+json') From 70e4870ba9d7bb0189e6112ede6eeba0557603e9 Mon Sep 17 00:00:00 2001 From: Izalia Mae Date: Mon, 7 Nov 2022 05:40:08 -0500 Subject: [PATCH 08/34] remove run_in_loop function --- relay/manage.py | 15 +++++---------- relay/misc.py | 1 + 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/relay/manage.py b/relay/manage.py index 7c64f2b..5dcad3a 100644 --- a/relay/manage.py +++ b/relay/manage.py @@ -76,7 +76,7 @@ def cli_inbox_follow(actor): if database.get_inbox(actor): return click.echo(f'Error: Already following actor: {actor}') - actor_data = run_in_loop(misc.request, actor, sign_headers=True) + actor_data = asyncio.run(misc.request(actor, sign_headers=True)) if not actor_data: return click.echo(f'Error: Failed to fetch actor: {actor}') @@ -84,7 +84,7 @@ def cli_inbox_follow(actor): database.add_inbox(actor_data.shared_inbox) database.save() - run_in_loop(misc.follow_remote_actor, actor) + asyncio.run(misc.follow_remote_actor(actor)) click.echo(f'Sent follow message to actor: {actor}') @@ -100,7 +100,7 @@ def cli_inbox_unfollow(actor): if database.del_inbox(actor): database.save() - run_in_loop(misc.unfollow_remote_actor, actor) + asyncio.run(misc.unfollow_remote_actor(actor)) return click.echo(f'Sent unfollow message to: {actor}') return click.echo(f'Error: Not following actor: {actor}') @@ -236,7 +236,7 @@ def cli_software_ban(name, fetch_nodeinfo): return click.echo('Banned all relay software') if fetch_nodeinfo: - software = run_in_loop(fetch_nodeinfo, name) + software = asyncio.run(misc.fetch_nodeinfo(name)) if not software: click.echo(f'Failed to fetch software name from domain: {name}') @@ -268,7 +268,7 @@ def cli_software_unban(name, fetch_nodeinfo): return click.echo('Unbanned all relay software') if fetch_nodeinfo: - software = run_in_loop(fetch_nodeinfo, name) + software = asyncio.run(misc.fetch_nodeinfo(name)) if not software: click.echo(f'Failed to fetch software name from domain: {name}') @@ -401,11 +401,6 @@ def relay_run(): loop.run_forever() -def run_in_loop(func, *args, **kwargs): - loop = asyncio.new_event_loop() - return loop.run_until_complete(func(*args, **kwargs)) - - async def handle_start_webserver(): config = app['config'] runner = AppRunner(app, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{Referer}i" "%{User-Agent}i"') diff --git a/relay/misc.py b/relay/misc.py index 3185f51..8b58478 100644 --- a/relay/misc.py +++ b/relay/misc.py @@ -140,6 +140,7 @@ async def fetch_nodeinfo(domain): return False +## todo: remove follow_remote_actor and unfollow_remote_actor async def follow_remote_actor(actor_uri): config = app['config'] From 8d17749a50695046fdc00be6596ff2baa801fed0 Mon Sep 17 00:00:00 2001 From: Izalia Mae Date: Mon, 7 Nov 2022 07:54:32 -0500 Subject: [PATCH 09/34] create Application class --- relay/__init__.py | 5 -- relay/application.py | 119 ++++++++++++++++++++++++++++++ relay/manage.py | 172 +++++++++++++------------------------------ relay/misc.py | 9 ++- relay/processors.py | 43 ++++------- relay/views.py | 60 ++++++++++----- 6 files changed, 233 insertions(+), 175 deletions(-) create mode 100644 relay/application.py diff --git a/relay/__init__.py b/relay/__init__.py index 9489f87..5c940ed 100644 --- a/relay/__init__.py +++ b/relay/__init__.py @@ -1,8 +1,3 @@ __version__ = '0.2.2' -from aiohttp.web import Application - from . import logger - - -app = Application() diff --git a/relay/application.py b/relay/application.py new file mode 100644 index 0000000..9acddbf --- /dev/null +++ b/relay/application.py @@ -0,0 +1,119 @@ +import asyncio +import logging +import os +import signal + +from aiohttp import web +from cachetools import LRUCache +from datetime import datetime, timedelta + +from .config import RelayConfig +from .database import RelayDatabase +from .misc import DotDict, check_open_port, set_app +from .views import routes + + +class Application(web.Application): + def __init__(self, cfgpath): + web.Application.__init__(self) + + self['starttime'] = None + self['running'] = False + self['is_docker'] = bool(os.environ.get('DOCKER_RUNNING')) + self['config'] = RelayConfig(cfgpath, self['is_docker']) + + if not self['config'].load(): + self['config'].save() + + self['database'] = RelayDatabase(self['config']) + self['database'].load() + + self['cache'] = DotDict({key: LRUCache(maxsize=self['config'][key]) for key in self['config'].cachekeys}) + self['semaphore'] = asyncio.Semaphore(self['config'].push_limit) + + self.set_signal_handler() + set_app(self) + + + @property + def cache(self): + return self['cache'] + + + @property + def config(self): + return self['config'] + + + @property + def database(self): + return self['database'] + + + @property + def is_docker(self): + return self['is_docker'] + + + @property + def semaphore(self): + return self['semaphore'] + + + @property + def uptime(self): + if not self['starttime']: + return timedelta(seconds=0) + + uptime = datetime.now() - self['starttime'] + + return timedelta(seconds=uptime.seconds) + + + def set_signal_handler(self): + signal.signal(signal.SIGHUP, self.stop) + signal.signal(signal.SIGINT, self.stop) + signal.signal(signal.SIGQUIT, self.stop) + signal.signal(signal.SIGTERM, self.stop) + + + def run(self): + if not check_open_port(self.config.listen, self.config.port): + return logging.error(f'A server is already running on port {self.config.port}') + + for route in routes: + if route[1] == '/stats' and logging.DEBUG < logging.root.level: + continue + + self.router.add_route(*route) + + logging.info(f'Starting webserver at {self.config.host} ({self.config.listen}:{self.config.port})') + asyncio.run(self.handle_run()) + + + def stop(self, *_): + self['running'] = False + + + async def handle_run(self): + self['running'] = True + + runner = web.AppRunner(self, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{User-Agent}i"') + await runner.setup() + + site = web.TCPSite(runner, + host = self.config.listen, + port = self.config.port, + reuse_address = True + ) + + await site.start() + self['starttime'] = datetime.now() + + while self['running']: + await asyncio.sleep(0.25) + + await site.stop() + + self['starttime'] = None + self['running'] = False diff --git a/relay/manage.py b/relay/manage.py index 5dcad3a..838e2f9 100644 --- a/relay/manage.py +++ b/relay/manage.py @@ -1,17 +1,15 @@ import Crypto import asyncio import click -import json import logging -import os import platform -from aiohttp.web import AppRunner, TCPSite -from cachetools import LRUCache +from . import misc, __version__ +from .application import Application +from .config import relay_software_names -from . import app, misc, views, __version__ -from .config import DotDict, RelayConfig, relay_software_names -from .database import RelayDatabase + +app = None @click.group('cli', context_settings={'show_default': True}, invoke_without_command=True) @@ -19,23 +17,11 @@ from .database import RelayDatabase @click.version_option(version=__version__, prog_name='ActivityRelay') @click.pass_context def cli(ctx, config): - app['is_docker'] = bool(os.environ.get('DOCKER_RUNNING')) - app['config'] = RelayConfig(config, app['is_docker']) - - if not app['config'].load(): - app['config'].save() - - app['database'] = RelayDatabase(app['config']) - app['database'].load() - - app['cache'] = DotDict() - app['semaphore'] = asyncio.Semaphore(app['config']['push_limit']) - - for key in app['config'].cachekeys: - app['cache'][key] = LRUCache(app['config'][key]) + global app + app = Application(config) if not ctx.invoked_subcommand: - if app['config'].host.endswith('example.com'): + if app.config.host.endswith('example.com'): relay_setup.callback() else: @@ -55,7 +41,7 @@ def cli_inbox_list(): click.echo('Connected to the following instances or relays:') - for inbox in app['database'].inboxes: + for inbox in app.database.inboxes: click.echo(f'- {inbox}') @@ -64,16 +50,13 @@ def cli_inbox_list(): def cli_inbox_follow(actor): 'Follow an actor (Relay must be running)' - config = app['config'] - database = app['database'] - - if config.is_banned(actor): + if app.config.is_banned(actor): return click.echo(f'Error: Refusing to follow banned actor: {actor}') if not actor.startswith('http'): actor = f'https://{actor}/actor' - if database.get_inbox(actor): + if app.database.get_inbox(actor): return click.echo(f'Error: Already following actor: {actor}') actor_data = asyncio.run(misc.request(actor, sign_headers=True)) @@ -81,8 +64,8 @@ def cli_inbox_follow(actor): if not actor_data: return click.echo(f'Error: Failed to fetch actor: {actor}') - database.add_inbox(actor_data.shared_inbox) - database.save() + app.database.add_inbox(actor_data.shared_inbox) + app.database.save() asyncio.run(misc.follow_remote_actor(actor)) click.echo(f'Sent follow message to actor: {actor}') @@ -93,13 +76,11 @@ def cli_inbox_follow(actor): def cli_inbox_unfollow(actor): 'Unfollow an actor (Relay must be running)' - database = app['database'] - if not actor.startswith('http'): actor = f'https://{actor}/actor' - if database.del_inbox(actor): - database.save() + if app.database.del_inbox(actor): + app.database.save() asyncio.run(misc.unfollow_remote_actor(actor)) return click.echo(f'Sent unfollow message to: {actor}') @@ -111,17 +92,14 @@ def cli_inbox_unfollow(actor): def cli_inbox_add(inbox): 'Add an inbox to the database' - database = app['database'] - config = app['config'] - if not inbox.startswith('http'): inbox = f'https://{inbox}/inbox' - if config.is_banned(inbox): + if app.config.is_banned(inbox): return click.echo(f'Error: Refusing to add banned inbox: {inbox}') - if database.add_inbox(inbox): - database.save() + if app.database.add_inbox(inbox): + app.database.save() return click.echo(f'Added inbox to the database: {inbox}') click.echo(f'Error: Inbox already in database: {inbox}') @@ -132,17 +110,15 @@ def cli_inbox_add(inbox): def cli_inbox_remove(inbox): 'Remove an inbox from the database' - database = app['database'] - try: - dbinbox = database.get_inbox(inbox, fail=True) + dbinbox = app.database.get_inbox(inbox, fail=True) except KeyError: click.echo(f'Error: Inbox does not exist: {inbox}') return - database.del_inbox(dbinbox['domain']) - database.save() + app.database.del_inbox(dbinbox['domain']) + app.database.save() click.echo(f'Removed inbox from the database: {inbox}') @@ -159,7 +135,7 @@ def cli_instance_list(): click.echo('Banned instances or relays:') - for domain in app['config'].blocked_instances: + for domain in app.config.blocked_instances: click.echo(f'- {domain}') @@ -168,17 +144,14 @@ def cli_instance_list(): def cli_instance_ban(target): 'Ban an instance and remove the associated inbox if it exists' - config = app['config'] - database = app['database'] - if target.startswith('http'): target = urlparse(target).hostname - if config.ban_instance(target): - config.save() + if app.config.ban_instance(target): + app.config.save() - if database.del_inbox(target): - database.save() + if app.database.del_inbox(target): + app.database.save() click.echo(f'Banned instance: {target}') return @@ -191,10 +164,8 @@ def cli_instance_ban(target): def cli_instance_unban(target): 'Unban an instance' - config = app['config'] - - if config.unban_instance(target): - config.save() + if app.config.unban_instance(target): + app.config.save() click.echo(f'Unbanned instance: {target}') return @@ -214,7 +185,7 @@ def cli_software_list(): click.echo('Banned software:') - for software in app['config'].blocked_software: + for software in app.config.blocked_software: click.echo(f'- {software}') @@ -226,13 +197,11 @@ def cli_software_list(): def cli_software_ban(name, fetch_nodeinfo): 'Ban software. Use RELAYS for NAME to ban relays' - config = app['config'] - if name == 'RELAYS': for name in relay_software_names: - config.ban_software(name) + app.config.ban_software(name) - config.save() + app.config.save() return click.echo('Banned all relay software') if fetch_nodeinfo: @@ -244,7 +213,7 @@ def cli_software_ban(name, fetch_nodeinfo): name = software if config.ban_software(name): - config.save() + app.config.save() return click.echo(f'Banned software: {name}') click.echo(f'Software already banned: {name}') @@ -258,11 +227,9 @@ def cli_software_ban(name, fetch_nodeinfo): def cli_software_unban(name, fetch_nodeinfo): 'Ban software. Use RELAYS for NAME to unban relays' - config = app['config'] - if name == 'RELAYS': for name in relay_software_names: - config.unban_software(name) + app.config.unban_software(name) config.save() return click.echo('Unbanned all relay software') @@ -275,8 +242,8 @@ def cli_software_unban(name, fetch_nodeinfo): name = software - if config.unban_software(name): - config.save() + if app.config.unban_software(name): + app.config.save() return click.echo(f'Unbanned software: {name}') click.echo(f'Software wasn\'t banned: {name}') @@ -293,7 +260,7 @@ def cli_whitelist(): def cli_whitelist_list(): click.echo('Current whitelisted domains') - for domain in app['config'].whitelist: + for domain in app.config.whitelist: click.echo(f'- {domain}') @@ -302,12 +269,10 @@ def cli_whitelist_list(): def cli_whitelist_add(instance): 'Add an instance to the whitelist' - config = app['config'] - - if not config.add_whitelist(instance): + if not app.config.add_whitelist(instance): return click.echo(f'Instance already in the whitelist: {instance}') - config.save() + app.config.save() click.echo(f'Instance added to the whitelist: {instance}') @@ -316,17 +281,14 @@ def cli_whitelist_add(instance): def cli_whitelist_remove(instance): 'Remove an instance from the whitelist' - config = app['config'] - database = app['database'] - - if not config.del_whitelist(instance): + if not app.config.del_whitelist(instance): return click.echo(f'Instance not in the whitelist: {instance}') - config.save() + app.config.save() - if config.whitelist_enabled: - if database.del_inbox(inbox): - database.save() + if app.config.whitelist_enabled: + if app.database.del_inbox(inbox): + app.database.save() click.echo(f'Removed instance from the whitelist: {instance}') @@ -335,23 +297,21 @@ def cli_whitelist_remove(instance): def relay_setup(): 'Generate a new config' - config = app['config'] - while True: - config.host = click.prompt('What domain will the relay be hosted on?', default=config.host) + app.config.host = click.prompt('What domain will the relay be hosted on?', default=app.config.host) if not config.host.endswith('example.com'): break click.echo('The domain must not be example.com') - config.listen = click.prompt('Which address should the relay listen on?', default=config.listen) + app.config.listen = click.prompt('Which address should the relay listen on?', default=app.config.listen) while True: - config.port = click.prompt('What TCP port should the relay listen on?', default=config.port, type=int) + app.config.port = click.prompt('What TCP port should the relay listen on?', default=app.config.port, type=int) break - config.save() + app.config.save() if not app['is_docker'] and click.confirm('Relay all setup! Would you like to run it now?'): relay_run.callback() @@ -361,9 +321,7 @@ def relay_setup(): def relay_run(): 'Run the relay' - config = app['config'] - - if config.host.endswith('example.com'): + if app.config.host.endswith('example.com'): return click.echo('Relay is not set up. Please edit your relay config or run "activityrelay setup".') vers_split = platform.python_version().split('.') @@ -378,38 +336,10 @@ def relay_run(): click.echo('Warning: PyCrypto is old and should be replaced with pycryptodome') return click.echo(pip_command) - if not misc.check_open_port(config.listen, config.port): - return click.echo(f'Error: A server is already running on port {config.port}') + if not misc.check_open_port(app.config.listen, app.config.port): + return click.echo(f'Error: A server is already running on port {app.config.port}') - # web pages - app.router.add_get('/', views.home) - - # endpoints - app.router.add_post('/actor', views.inbox) - app.router.add_post('/inbox', views.inbox) - app.router.add_get('/actor', views.actor) - app.router.add_get('/nodeinfo/2.0.json', views.nodeinfo_2_0) - app.router.add_get('/.well-known/nodeinfo', views.nodeinfo_wellknown) - app.router.add_get('/.well-known/webfinger', views.webfinger) - - if logging.DEBUG >= logging.root.level: - app.router.add_get('/stats', views.stats) - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - asyncio.ensure_future(handle_start_webserver(), loop=loop) - loop.run_forever() - - -async def handle_start_webserver(): - config = app['config'] - runner = AppRunner(app, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{Referer}i" "%{User-Agent}i"') - - logging.info(f'Starting webserver at {config.host} ({config.listen}:{config.port})') - await runner.setup() - - site = TCPSite(runner, config.listen, config.port) - await site.start() + app.run() def main(): diff --git a/relay/misc.py b/relay/misc.py index 8b58478..1fc109c 100644 --- a/relay/misc.py +++ b/relay/misc.py @@ -15,10 +15,10 @@ from json.decoder import JSONDecodeError from urllib.parse import urlparse from uuid import uuid4 -from . import app from .http_debug import http_debug +app = None HASHES = { 'sha1': SHA, 'sha256': SHA256, @@ -26,6 +26,11 @@ HASHES = { } +def set_app(new_app): + global app + app = new_app + + def build_signing_string(headers, used_headers): return '\n'.join(map(lambda x: ': '.join([x.lower(), headers[x]]), used_headers)) @@ -62,7 +67,7 @@ def distill_inboxes(actor, object_id): database = app['database'] for inbox in database.inboxes: - if inbox != actor.shared_inbox or urlparse(inbox).hostname != urlparse(object_id).hostname: + if inbox != actor.shared_inbox and urlparse(inbox).hostname != urlparse(object_id).hostname: yield inbox diff --git a/relay/processors.py b/relay/processors.py index e38902d..b8a19aa 100644 --- a/relay/processors.py +++ b/relay/processors.py @@ -3,21 +3,18 @@ import logging from uuid import uuid4 -from . import app, misc +from . import misc async def handle_relay(request, actor, data, software): - cache = app['cache'].objects - config = app['config'] - - if data.objectid in cache: - logging.verbose(f'already relayed {data.objectid} as {cache[data.objectid]}') + if data.objectid in request.app.cache.objects: + logging.verbose(f'already relayed {data.objectid}') return logging.verbose(f'Relaying post from {data.actorid}') message = misc.Message.new_announce( - host = config.host, + host = request.app.config.host, object = data.objectid ) @@ -27,19 +24,16 @@ async def handle_relay(request, actor, data, software): futures = [misc.request(inbox, data=message) for inbox in inboxes] asyncio.ensure_future(asyncio.gather(*futures)) - cache[data.objectid] = message.id + request.app.cache.objects[data.objectid] = message.id async def handle_forward(request, actor, data, software): - cache = app['cache'].objects - config = app['config'] - - if data.id in cache: + if data.id in request.app.cache.objects: logging.verbose(f'already forwarded {data.id}') return message = misc.Message.new_announce( - host = config.host, + host = request.app.config.host, object = data ) @@ -50,22 +44,19 @@ async def handle_forward(request, actor, data, software): futures = [misc.request(inbox, data=message) for inbox in inboxes] asyncio.ensure_future(asyncio.gather(*futures)) - cache[data.id] = message.id + request.app.cache.objects[data.id] = message.id async def handle_follow(request, actor, data, software): - config = app['config'] - database = app['database'] + if request.app.database.add_inbox(inbox, data.id): + request.app.database.set_followid(actor.id, data.id) - if database.add_inbox(inbox, data.id): - database.set_followid(actor.id, data.id) - - database.save() + request.app.database.save() await misc.request( actor.shared_inbox, misc.Message.new_response( - host = config.host, + host = request.app.config.host, actor = actor.id, followid = data.id, accept = True @@ -78,7 +69,7 @@ async def handle_follow(request, actor, data, software): misc.request( actor.shared_inbox, misc.Message.new_follow( - host = config.host, + host = request.app.config.host, actor = actor.id ) ) @@ -89,15 +80,13 @@ async def handle_undo(request, actor, data, software): if data['object']['type'] != 'Follow': return await handle_forward(request, actor, data, software) - database = app['database'] - - if not database.del_inbox(actor.domain, data.id): + if not request.app.database.del_inbox(actor.domain, data.id): return - database.save() + request.app.database.save() message = misc.Message.new_unfollow( - host = config.host, + host = request.app.config.host, actor = actor.id, follow = data ) diff --git a/relay/views.py b/relay/views.py index b493378..676024a 100644 --- a/relay/views.py +++ b/relay/views.py @@ -2,14 +2,25 @@ import logging import subprocess import traceback -from aiohttp.web import HTTPForbidden, HTTPUnauthorized, Response, json_response +from aiohttp.web import HTTPForbidden, HTTPUnauthorized, Response, json_response, route -from . import __version__, app, misc +from . import __version__, misc from .http_debug import STATS from .misc import Message from .processors import run_processor +routes = [] + + +def register_route(method, path): + def wrapper(func): + routes.append([method, path, func]) + return func + + return wrapper + + try: commit_label = subprocess.check_output(["git", "rev-parse", "HEAD"]).strip().decode('ascii') version = f'{__version__} {commit_label}' @@ -18,9 +29,14 @@ except: version = __version__ +@register_route('GET', '/') async def home(request): - targets = '
'.join(app['database'].hostnames) - text = """ + targets = '
'.join(request.app.database.hostnames) + note = request.app.config.note + count = len(request.app.database.hostnames) + host = request.app.config.host + + text = f""" ActivityPub Relay at {host}