From c66f9d34b3b36e841987e91e0d5fec0cb28c7fa7 Mon Sep 17 00:00:00 2001 From: Izalia Mae Date: Mon, 7 Nov 2022 05:30:13 -0500 Subject: [PATCH] create Message class --- relay/manage.py | 4 +- relay/misc.py | 183 +++++++++++++++++++++++++++++++++++--------- relay/processors.py | 130 +++++++++++++++---------------- relay/views.py | 58 ++++++-------- 4 files changed, 229 insertions(+), 146 deletions(-) diff --git a/relay/manage.py b/relay/manage.py index fb76236..7c64f2b 100644 --- a/relay/manage.py +++ b/relay/manage.py @@ -81,9 +81,7 @@ def cli_inbox_follow(actor): if not actor_data: return click.echo(f'Error: Failed to fetch actor: {actor}') - inbox = misc.get_actor_inbox(actor_data) - - database.add_inbox(inbox) + database.add_inbox(actor_data.shared_inbox) database.save() run_in_loop(misc.follow_remote_actor, actor) diff --git a/relay/misc.py b/relay/misc.py index 7323633..3185f51 100644 --- a/relay/misc.py +++ b/relay/misc.py @@ -4,6 +4,7 @@ import json import logging import socket import traceback +import uuid from Crypto.Hash import SHA, SHA256, SHA512 from Crypto.PublicKey import RSA @@ -57,27 +58,12 @@ def create_signature_header(headers): return ','.join(chunks) -def distill_object_id(activity): - logging.debug(f'>> determining object ID for {activity["object"]}') - - try: - return activity['object']['id'] - - except TypeError: - return activity['object'] - - def distill_inboxes(actor, object_id): database = app['database'] - origin_hostname = urlparse(object_id).hostname - actor_inbox = get_actor_inbox(actor) - targets = [] for inbox in database.inboxes: - if inbox != actor_inbox or urlparse(inbox).hostname != origin_hostname: - targets.append(inbox) - - return targets + if inbox != actor.shared_inbox or urlparse(inbox).hostname != urlparse(object_id).hostname: + yield inbox def generate_body_digest(body): @@ -93,10 +79,6 @@ def generate_body_digest(body): return bodyhash -def get_actor_inbox(actor): - return actor.get('endpoints', {}).get('sharedInbox', actor['inbox']) - - def sign_signing_string(sigstring, key): pkcs = PKCS1_v1_5.new(key) h = SHA256.new() @@ -162,14 +144,11 @@ async def follow_remote_actor(actor_uri): config = app['config'] actor = await request(actor_uri) - inbox = get_actor_inbox(actor) if not actor: logging.error(f'failed to fetch actor at: {actor_uri}') return - logging.verbose(f'sending follow request: {actor_uri}') - message = { "@context": "https://www.w3.org/ns/activitystreams", "type": "Follow", @@ -179,7 +158,8 @@ async def follow_remote_actor(actor_uri): "actor": f"https://{config.host}/actor" } - await request(inbox, message) + logging.verbose(f'sending follow request: {actor_uri}') + await request(actor.shared_inbox, message) async def unfollow_remote_actor(actor_uri): @@ -191,9 +171,6 @@ async def unfollow_remote_actor(actor_uri): logging.error(f'failed to fetch actor: {actor_uri}') return - inbox = get_actor_inbox(actor) - logging.verbose(f'sending unfollow request to inbox: {inbox}') - message = { "@context": "https://www.w3.org/ns/activitystreams", "type": "Undo", @@ -208,7 +185,8 @@ async def unfollow_remote_actor(actor_uri): "actor": f"https://{config.host}/actor" } - await request(inbox, message) + logging.verbose(f'sending unfollow request to inbox: {actor.shared_inbox}') + await request(actor.shared_inbox, message) async def request(uri, data=None, force=False, sign_headers=True, activity=True): @@ -265,16 +243,28 @@ async def request(uri, data=None, force=False, sign_headers=True, activity=True) async with session.request(method, uri, headers=headers, data=data) as resp: ## aiohttp has been known to leak if the response hasn't been read, ## so we're just gonna read the request no matter what - resp_data = await resp.json() + resp_data = await resp.read() - if resp.status not in [200, 202]: - if not resp_data: - logging.verbose(f'Received error when requesting {uri}: {resp.status} {resp_data}') - return - - logging.verbose(f'Received error when sending {action} to {uri}: {resp.status} {resp_data}') + ## Not expecting a response, so just return + if resp.status == 202: return + elif resp.status != 200: + if not resp_data: + return logging.verbose(f'Received error when requesting {uri}: {resp.status} {resp_data}') + + return logging.verbose(f'Received error when sending {action} to {uri}: {resp.status} {resp_data}') + + if resp.content_type == 'application/activity+json': + resp_data = await resp.json(loads=Message.new_from_json) + + elif resp.content_type == 'application/json': + resp_data = await resp.json(loads=DotDict.new_from_json) + + else: + logging.verbose(f'Invalid Content-Type for "{url}": {resp.content_type}') + return logging.debug(f'Response: {resp_data}') + logging.debug(f'{uri} >> resp {resp_data}') app['cache'].json[uri] = resp_data @@ -354,8 +344,127 @@ class DotDict(dict): @classmethod def new_from_json(cls, data): - return cls(json.loads(data)) + if not data: + raise JSONDecodeError('Empty body', data, 1) + + try: + return cls(json.loads(data)) + + except ValueError: + raise JSONDecodeError('Invalid body', data, 1) def to_json(self, indent=None): return json.dumps(self, indent=indent) + + +class Message(DotDict): + @classmethod + def new_actor(cls, host, pubkey, description=None): + return cls({ + '@context': 'https://www.w3.org/ns/activitystreams', + 'id': f'https://{host}/actor', + 'type': 'Application', + 'preferredUsername': 'relay', + 'name': 'ActivityRelay', + 'summary': description or 'ActivityRelay bot', + 'followers': f'https://{host}/followers', + 'following': f'https://{host}/following', + 'inbox': f'https://{host}/inbox', + 'url': f'https://{host}/inbox', + 'endpoints': { + 'sharedInbox': f'https://{host}/inbox' + }, + 'publicKey': { + 'id': f'https://{host}/actor#main-key', + 'owner': f'https://{host}/actor', + 'publicKeyPem': pubkey + } + }) + + + @classmethod + def new_announce(cls, host, object): + return cls({ + '@context': 'https://www.w3.org/ns/activitystreams', + 'id': f'https://{host}/activities/{uuid.uuid4()}', + 'type': 'Announce', + 'to': [f'https://{host}/followers'], + 'actor': f'https://{host}/actor', + 'object': object + }) + + + @classmethod + def new_follow(cls, host, actor): + return cls({ + '@context': 'https://www.w3.org/ns/activitystreams', + 'type': 'Follow', + 'to': [actor], + 'object': actor, + 'id': f'https://{host}/activities/{uuid.uuid4()}', + 'actor': f'https://{host}/actor' + }) + + + @classmethod + def new_unfollow(cls, host, actor, follow): + return cls({ + '@context': 'https://www.w3.org/ns/activitystreams', + 'id': f'https://{host}/activities/{uuid.uuid4()}', + 'type': 'Undo', + 'to': [actor], + 'actor': f'https://{host}/actor', + 'object': follow + }) + + + @classmethod + def new_response(cls, host, actor, followid, accept): + return cls({ + '@context': 'https://www.w3.org/ns/activitystreams', + 'id': f'https://{host}/activities/{uuid.uuid4()}', + 'type': 'Accept' if accept else 'Reject', + 'to': [actor], + 'actor': f'https://{host}/actor', + 'object': { + 'id': followid, + 'type': 'Follow', + 'object': f'https://{host}/actor', + 'actor': actor + } + }) + + + # misc properties + @property + def domain(self): + return urlparse(self.id).hostname + + + # actor properties + @property + def pubkey(self): + return self.publicKey.publicKeyPem + + + @property + def shared_inbox(self): + return self.get('endpoints', {}).get('sharedInbox', self.inbox) + + + # activity properties + @property + def actorid(self): + if isinstance(self.actor, dict): + return self.actor.id + + return self.actor + + + @property + def objectid(self): + if isinstance(self.object, dict): + return self.object.id + + return self.object diff --git a/relay/processors.py b/relay/processors.py index 0ca2358..e38902d 100644 --- a/relay/processors.py +++ b/relay/processors.py @@ -6,113 +6,103 @@ from uuid import uuid4 from . import app, misc -async def handle_relay(actor, data, request): +async def handle_relay(request, actor, data, software): cache = app['cache'].objects - object_id = misc.distill_object_id(data) + config = app['config'] - if object_id in cache: - logging.verbose(f'already relayed {object_id} as {cache[object_id]}') + if data.objectid in cache: + logging.verbose(f'already relayed {data.objectid} as {cache[data.objectid]}') return - logging.verbose(f'Relaying post from {actor["id"]}') + logging.verbose(f'Relaying post from {data.actorid}') - activity_id = f"https://{request.host}/activities/{uuid4()}" - - message = { - "@context": "https://www.w3.org/ns/activitystreams", - "type": "Announce", - "to": [f"https://{request.host}/followers"], - "actor": f"https://{request.host}/actor", - "object": object_id, - "id": activity_id - } + message = misc.Message.new_announce( + host = config.host, + object = data.objectid + ) logging.debug(f'>> relay: {message}') - inboxes = misc.distill_inboxes(actor, object_id) + inboxes = misc.distill_inboxes(actor, data.objectid) futures = [misc.request(inbox, data=message) for inbox in inboxes] asyncio.ensure_future(asyncio.gather(*futures)) - cache[object_id] = activity_id + cache[data.objectid] = message.id -async def handle_forward(actor, data, request): +async def handle_forward(request, actor, data, software): cache = app['cache'].objects - object_id = data['id'] + config = app['config'] - if object_id in cache: - logging.verbose(f'already forwarded {object_id}') + if data.id in cache: + logging.verbose(f'already forwarded {data.id}') return - activity_id = f"https://{request.host}/activities/{uuid4()}" + message = misc.Message.new_announce( + host = config.host, + object = data + ) - message = { - "@context": "https://www.w3.org/ns/activitystreams", - "type": "Announce", - "to": [f"https://{request.host}/followers"], - "actor": f"https://{request.host}/actor", - "object": data, - "id": activity_id - } - - logging.verbose(f'Forwarding post from {actor["id"]}') + logging.verbose(f'Forwarding post from {actor.id}') logging.debug(f'>> Relay {data}') - inboxes = misc.distill_inboxes(actor, object_id) - + inboxes = misc.distill_inboxes(actor, data.id) futures = [misc.request(inbox, data=message) for inbox in inboxes] + asyncio.ensure_future(asyncio.gather(*futures)) - - cache[object_id] = activity_id + cache[data.id] = message.id -async def handle_follow(actor, data, request): +async def handle_follow(request, actor, data, software): config = app['config'] database = app['database'] - inbox = misc.get_actor_inbox(actor) - dbinbox = database.get_inbox(inbox) + if database.add_inbox(inbox, data.id): + database.set_followid(actor.id, data.id) - if not database.add_inbox(inbox, data['id']): - database.set_followid(inbox, data['id']) - database.save() + database.save() - asyncio.ensure_future(misc.follow_remote_actor(actor['id'])) + await misc.request( + actor.shared_inbox, + misc.Message.new_response( + host = config.host, + actor = actor.id, + followid = data.id, + accept = True + ) + ) - message = { - "@context": "https://www.w3.org/ns/activitystreams", - "type": "Accept", - "to": [actor["id"]], - "actor": config.actor, - - # this is wrong per litepub, but mastodon < 2.4 is not compliant with that profile. - "object": { - "type": "Follow", - "id": data["id"], - "object": config.actor, - "actor": actor["id"] - }, - - "id": f"https://{request.host}/activities/{uuid4()}", - } - - asyncio.ensure_future(misc.request(inbox, message)) + # Are Akkoma and Pleroma the only two that expect a follow back? + # Ignoring only Mastodon for now + if software != 'mastodon': + misc.request( + actor.shared_inbox, + misc.Message.new_follow( + host = config.host, + actor = actor.id + ) + ) -async def handle_undo(actor, data, request): +async def handle_undo(request, actor, data, software): ## If the object is not a Follow, forward it if data['object']['type'] != 'Follow': - return await handle_forward(actor, data, request) + return await handle_forward(request, actor, data, software) database = app['database'] - objectid = misc.distill_object_id(data) - if not database.del_inbox(actor['id'], objectid): + if not database.del_inbox(actor.domain, data.id): return database.save() - await misc.unfollow_remote_actor(actor['id']) + message = misc.Message.new_unfollow( + host = config.host, + actor = actor.id, + follow = data + ) + + await misc.request(actor.shared_inbox, message) processors = { @@ -125,9 +115,9 @@ processors = { } -async def run_processor(request, data, actor): - if data['type'] not in processors: +async def run_processor(request, actor, data, software): + if data.type not in processors: return - logging.verbose(f'New activity from actor: {actor["id"]} {data["type"]}') - return await processors[data['type']](actor, data, request) + logging.verbose(f'New "{data.type}" from actor: {actor.id}') + return await processors[data.type](request, actor, data, software) diff --git a/relay/views.py b/relay/views.py index a5da8ff..b493378 100644 --- a/relay/views.py +++ b/relay/views.py @@ -3,10 +3,10 @@ import subprocess import traceback from aiohttp.web import HTTPForbidden, HTTPUnauthorized, Response, json_response -from urllib.parse import urlparse from . import __version__, app, misc from .http_debug import STATS +from .misc import Message from .processors import run_processor @@ -48,28 +48,13 @@ a:hover {{ color: #8AF; }} async def actor(request): + config = app['config'] database = app['database'] - data = { - "@context": "https://www.w3.org/ns/activitystreams", - "endpoints": { - "sharedInbox": f"https://{request.host}/inbox" - }, - "followers": f"https://{request.host}/followers", - "following": f"https://{request.host}/following", - "inbox": f"https://{request.host}/inbox", - "name": "ActivityRelay", - "type": "Application", - "id": f"https://{request.host}/actor", - "publicKey": { - "id": f"https://{request.host}/actor#main-key", - "owner": f"https://{request.host}/actor", - "publicKeyPem": database.pubkey - }, - "summary": "ActivityRelay bot", - "preferredUsername": "relay", - "url": f"https://{request.host}/actor" - } + data = Message.new_actor( + host = config.host, + pubkey = database.pubkey + ) return json_response(data, content_type='application/activity+json') @@ -85,9 +70,10 @@ async def inbox(request): ## read message and get actor id and domain try: - data = await request.json() - actor_id = data['actor'] - actor_domain = urlparse(actor_id).hostname + data = await request.json(loads=Message.new_from_json) + + if 'actor' not in data: + raise KeyError('actor') ## reject if there is no actor in the message except KeyError: @@ -99,44 +85,44 @@ async def inbox(request): logging.verbose('Failed to parse inbox message') raise HTTPUnauthorized(body='failed to parse message') - actor = await misc.request(actor_id) + actor = await misc.request(data.actorid) ## reject if actor is empty if not actor: - logging.verbose(f'Failed to fetch actor: {actor_id}') + logging.verbose(f'Failed to fetch actor: {data.actorid}') raise HTTPUnauthorized('failed to fetch actor') ## reject if the actor isn't whitelisted while the whiltelist is enabled - elif config.whitelist_enabled and not config.is_whitelisted(actor_id): - logging.verbose(f'Rejected actor for not being in the whitelist: {actor_id}') + elif config.whitelist_enabled and not config.is_whitelisted(data.domain): + logging.verbose(f'Rejected actor for not being in the whitelist: {data.actorid}') raise HTTPForbidden(body='access denied') ## reject if actor is banned - if app['config'].is_banned(actor_id): - logging.verbose(f'Ignored request from banned actor: {actor_id}') + if app['config'].is_banned(data.domain): + logging.verbose(f'Ignored request from banned actor: {data.actorid}') raise HTTPForbidden(body='access denied') ## reject if software used by actor is banned if len(config.blocked_software): - software = await misc.fetch_nodeinfo(actor_domain) + software = await misc.fetch_nodeinfo(data.domain) if config.is_banned_software(software): logging.verbose(f'Rejected actor for using specific software: {software}') raise HTTPForbidden(body='access denied') ## reject if the signature is invalid - if not (await misc.validate_signature(actor_id, request)): - logging.verbose(f'signature validation failed for: {actor_id}') + if not (await misc.validate_signature(data.actorid, request)): + logging.verbose(f'signature validation failed for: {data.actorid}') raise HTTPUnauthorized(body='signature check failed, signature did not match key') ## reject if activity type isn't 'Follow' and the actor isn't following - if data['type'] != 'Follow' and not database.get_inbox(actor_domain): - logging.verbose(f'Rejected actor for trying to post while not following: {actor_id}') + if data['type'] != 'Follow' and not database.get_inbox(data.domain): + logging.verbose(f'Rejected actor for trying to post while not following: {data.actorid}') raise HTTPUnauthorized(body='access denied') logging.debug(f">> payload {data}") - await run_processor(request, data, actor) + await run_processor(request, actor, data, software) return Response(body=b'{}', content_type='application/activity+json')