import aiohttp import aiohttp.web import asyncio import logging import uuid import re import simplejson as json import cgi import datetime from urllib.parse import urlsplit from Crypto.PublicKey import RSA from cachetools import LFUCache from . import app, CONFIG from .database import DATABASE from .http_debug import http_debug from .remote_actor import fetch_actor from .http_signatures import sign_headers, generate_body_digest # generate actor keys if not present if "actorKeys" not in DATABASE: logging.info("No actor keys present, generating 4096-bit RSA keypair.") privkey = RSA.generate(4096) pubkey = privkey.publickey() DATABASE["actorKeys"] = { "publicKey": pubkey.exportKey('PEM').decode('utf-8'), "privateKey": privkey.exportKey('PEM').decode('utf-8') } PRIVKEY = RSA.importKey(DATABASE["actorKeys"]["privateKey"]) PUBKEY = PRIVKEY.publickey() AP_CONFIG = CONFIG['ap'] CACHE_SIZE = CONFIG.get('cache-size', 16384) CACHE = LFUCache(CACHE_SIZE) sem = asyncio.Semaphore(500) async def actor(request): data = { "@context": "https://www.w3.org/ns/activitystreams", "endpoints": { "sharedInbox": "https://{}/inbox".format(request.host) }, "followers": "https://{}/followers".format(request.host), "following": "https://{}/following".format(request.host), "inbox": "https://{}/inbox".format(request.host), "name": "ActivityRelay", "type": "Application", "id": "https://{}/actor".format(request.host), "publicKey": { "id": "https://{}/actor#main-key".format(request.host), "owner": "https://{}/actor".format(request.host), "publicKeyPem": DATABASE["actorKeys"]["publicKey"] }, "summary": "ActivityRelay bot", "preferredUsername": "relay", "url": "https://{}/actor".format(request.host) } return aiohttp.web.json_response(data, content_type='application/activity+json') app.router.add_get('/actor', actor) get_actor_inbox = lambda actor: actor.get('endpoints', {}).get('sharedInbox', actor['inbox']) async def push_message_to_actor(actor, message, our_key_id): inbox = get_actor_inbox(actor) url = urlsplit(inbox) # XXX: Digest data = json.dumps(message) headers = { '(request-target)': 'post {}'.format(url.path), 'Content-Length': str(len(data)), 'Content-Type': 'application/activity+json', 'User-Agent': 'ActivityRelay', 'Host': url.netloc, 'Digest': 'SHA-256={}'.format(generate_body_digest(data)), 'Date': datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT') } headers['signature'] = sign_headers(headers, PRIVKEY, our_key_id) headers.pop('(request-target)') headers.pop('Host') logging.debug('%r >> %r', inbox, message) global sem async with sem: try: async with aiohttp.ClientSession(trace_configs=[http_debug()]) as session: async with session.post(inbox, data=data, headers=headers) as resp: if resp.status == 202: return resp_payload = await resp.text() logging.debug('%r >> resp %r', inbox, resp_payload) except Exception as e: logging.info('Caught %r while pushing to %r.', e, inbox) async def fetch_nodeinfo(domain): headers = {'Accept': 'application/json'} nodeinfo_url = None wk_nodeinfo = await fetch_actor(f'https://{domain}/.well-known/nodeinfo', headers=headers) if not wk_nodeinfo: return for link in wk_nodeinfo.get('links', ''): if link['rel'] == 'http://nodeinfo.diaspora.software/ns/schema/2.0': nodeinfo_url = link['href'] break if not nodeinfo_url: return nodeinfo_data = await fetch_actor(nodeinfo_url, headers=headers) software = nodeinfo_data.get('software') return software.get('name') if software else None async def follow_remote_actor(actor_uri): actor = await fetch_actor(actor_uri) if not actor: logging.info('failed to fetch actor at: %r', actor_uri) return if AP_CONFIG['whitelist_enabled'] is True and urlsplit(actor_uri).hostname not in AP_CONFIG['whitelist']: logging.info('refusing to follow non-whitelisted actor: %r', actor_uri) return logging.info('following: %r', actor_uri) message = { "@context": "https://www.w3.org/ns/activitystreams", "type": "Follow", "to": [actor['id']], "object": actor['id'], "id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4()), "actor": "https://{}/actor".format(AP_CONFIG['host']) } await push_message_to_actor(actor, message, "https://{}/actor#main-key".format(AP_CONFIG['host'])) async def unfollow_remote_actor(actor_uri): actor = await fetch_actor(actor_uri) if not actor: logging.info('failed to fetch actor at: %r', actor_uri) return logging.info('unfollowing: %r', actor_uri) message = { "@context": "https://www.w3.org/ns/activitystreams", "type": "Undo", "to": [actor['id']], "object": { "type": "Follow", "object": actor_uri, "actor": actor['id'], "id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4()) }, "id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4()), "actor": "https://{}/actor".format(AP_CONFIG['host']) } await push_message_to_actor(actor, message, "https://{}/actor#main-key".format(AP_CONFIG['host'])) tag_re = re.compile(r'(|<[^>]*>)') def strip_html(data): no_tags = tag_re.sub('', data) return cgi.escape(no_tags) def distill_inboxes(actor, object_id): global DATABASE origin_hostname = urlsplit(object_id).hostname inbox = get_actor_inbox(actor) targets = [target for target in DATABASE.get('relay-list', []) if target != inbox] targets = [target for target in targets if urlsplit(target).hostname != origin_hostname] hostnames = [urlsplit(target).hostname for target in targets] assert inbox not in targets assert origin_hostname not in hostnames return targets def distill_object_id(activity): logging.debug('>> determining object ID for %r', activity['object']) obj = activity['object'] if isinstance(obj, str): return obj return obj['id'] async def handle_relay(actor, data, request): global CACHE object_id = distill_object_id(data) if object_id in CACHE: logging.debug('>> already relayed %r as %r', object_id, CACHE[object_id]) return activity_id = "https://{}/activities/{}".format(request.host, uuid.uuid4()) message = { "@context": "https://www.w3.org/ns/activitystreams", "type": "Announce", "to": ["https://{}/followers".format(request.host)], "actor": "https://{}/actor".format(request.host), "object": object_id, "id": activity_id } logging.debug('>> relay: %r', message) inboxes = distill_inboxes(actor, object_id) futures = [push_message_to_actor({'inbox': inbox}, message, 'https://{}/actor#main-key'.format(request.host)) for inbox in inboxes] asyncio.ensure_future(asyncio.gather(*futures)) CACHE[object_id] = activity_id async def handle_forward(actor, data, request): object_id = distill_object_id(data) logging.debug('>> Relay %r', data) inboxes = distill_inboxes(actor, object_id) futures = [ push_message_to_actor( {'inbox': inbox}, data, 'https://{}/actor#main-key'.format(request.host)) for inbox in inboxes] asyncio.ensure_future(asyncio.gather(*futures)) async def handle_follow(actor, data, request): global DATABASE following = DATABASE.get('relay-list', []) inbox = get_actor_inbox(actor) if urlsplit(inbox).hostname in AP_CONFIG['blocked_instances']: return if inbox not in following: following += [inbox] DATABASE['relay-list'] = following asyncio.ensure_future(follow_remote_actor(actor['id'])) message = { "@context": "https://www.w3.org/ns/activitystreams", "type": "Accept", "to": [actor["id"]], "actor": "https://{}/actor".format(request.host), # this is wrong per litepub, but mastodon < 2.4 is not compliant with that profile. "object": { "type": "Follow", "id": data["id"], "object": "https://{}/actor".format(request.host), "actor": actor["id"] }, "id": "https://{}/activities/{}".format(request.host, uuid.uuid4()), } asyncio.ensure_future(push_message_to_actor(actor, message, 'https://{}/actor#main-key'.format(request.host))) async def handle_undo(actor, data, request): global DATABASE child = data['object'] if child['type'] == 'Follow': following = DATABASE.get('relay-list', []) inbox = get_actor_inbox(actor) if inbox in following: following.remove(inbox) DATABASE['relay-list'] = following await unfollow_remote_actor(actor['id']) processors = { 'Announce': handle_relay, 'Create': handle_relay, 'Delete': handle_forward, 'Follow': handle_follow, 'Undo': handle_undo, 'Update': handle_forward, } async def inbox(request): data = await request.json() instance = urlsplit(data['actor']).hostname if AP_CONFIG['blocked_software']: software = await fetch_nodeinfo(instance) if software and software.lower() in AP_CONFIG['blocked_software']: raise aiohttp.web.HTTPUnauthorized(body='relays have been blocked', content_type='text/plain') if 'actor' not in data or not request['validated']: raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain') elif data['type'] != 'Follow' and 'https://{}/inbox'.format(instance) not in DATABASE['relay-list']: raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain') elif AP_CONFIG['whitelist_enabled'] is True and instance not in AP_CONFIG['whitelist']: raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain') actor = await fetch_actor(data["actor"]) actor_uri = 'https://{}/actor'.format(request.host) logging.debug(">> payload %r", data) processor = processors.get(data['type'], None) if processor: await processor(actor, data, request) return aiohttp.web.Response(body=b'{}', content_type='application/activity+json') app.router.add_post('/inbox', inbox)