import aiohttp import aiohttp.web import asyncio import logging import uuid import re import simplejson as json import cgi import time import random from urllib.parse import urlsplit from Crypto.PublicKey import RSA from .database import DATABASE from .http_debug import http_debug from cachetools import LFUCache # generate actor keys if not present if "actorKeys" not in DATABASE: logging.info("No actor keys present, generating 4096-bit RSA keypair.") privkey = RSA.generate(4096) pubkey = privkey.publickey() DATABASE["actorKeys"] = { "publicKey": pubkey.exportKey('PEM').decode('utf-8'), "privateKey": privkey.exportKey('PEM').decode('utf-8') } PRIVKEY = RSA.importKey(DATABASE["actorKeys"]["privateKey"]) PUBKEY = PRIVKEY.publickey() from . import app, CONFIG from .remote_actor import fetch_actor AP_CONFIG = CONFIG.get('ap', {'host': 'localhost','blocked_instances':[]}) CACHE_SIZE = CONFIG.get('cache-size', 16384) INBOUND_STATS={'processed':0,'rejected':0} CACHE = LFUCache(CACHE_SIZE) async def actor(request): data = { "@context": "https://www.w3.org/ns/activitystreams", "endpoints": { "sharedInbox": "https://{}/inbox".format(request.host) }, "followers": "https://{}/followers".format(request.host), "following": "https://{}/following".format(request.host), "inbox": "https://{}/inbox".format(request.host), "name": "ActivityRelay", "type": "Application", "id": "https://{}/actor".format(request.host), "publicKey": { "id": "https://{}/actor#main-key".format(request.host), "owner": "https://{}/actor".format(request.host), "publicKeyPem": DATABASE["actorKeys"]["publicKey"] }, "summary": "ActivityRelay bot", "preferredUsername": "relay", "url": "https://{}/actor".format(request.host) } return aiohttp.web.json_response(data) app.router.add_get('/actor', actor) from .http_signatures import sign_headers get_actor_inbox = lambda actor: actor.get('endpoints', {}).get('sharedInbox', actor['inbox']) async def push_message_to_actor(actor, message, our_key_id): inbox = get_actor_inbox(actor) url = urlsplit(inbox) # XXX: Digest data = json.dumps(message) headers = { '(request-target)': 'post {}'.format(url.path), 'Content-Length': str(len(data)), 'Content-Type': 'application/activity+json', 'User-Agent': 'ActivityRelay' } headers['signature'] = sign_headers(headers, PRIVKEY, our_key_id) headers.pop('(request-target)') logging.debug('%r >> %r', inbox, message) global DATABASE try: async with aiohttp.ClientSession(trace_configs=[http_debug()]) as session: async with session.post(inbox, data=data, headers=headers) as resp: if resp.status == 202: # if we get a success let's delete from backoff bi = DATABASE.get('backoff-instances',{}) if urlsplit(inbox).hostname in bi: del bi[urlsplit(inbox).hostname] DATABASE['backoff-instances'] = bi return elif resp.status == 200: # if we get a success let's delete from backoff bi = DATABASE.get('backoff-instances',{}) if urlsplit(inbox).hostname in bi: del bi[urlsplit(inbox).hostname] DATABASE['backoff-instances'] = bi resp_payload = await resp.text() logging.debug('%r >> resp %r', inbox, resp_payload) except Exception as e: bi = DATABASE.get('backoff-instances',{}) if url.hostname not in bi: bi[url.hostname] = {'ts':time.time(),'count':1} else: bi[url.hostname]['count'] +=1 DATABASE['backoff-instances'] = bi logging.info('Caught %r while pushing to %r.', e, inbox) async def follow_remote_actor(actor_uri): actor = await fetch_actor(actor_uri) if not actor: logging.info('failed to fetch actor at: %r', actor_uri) return logging.info('following: %r', actor_uri) message = { "@context": "https://www.w3.org/ns/activitystreams", "type": "Follow", "to": [actor['id']], "object": actor['id'], "id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4()), "actor": "https://{}/actor".format(AP_CONFIG['host']) } await push_message_to_actor(actor, message, "https://{}/actor#main-key".format(AP_CONFIG['host'])) async def unfollow_remote_actor(actor_uri): actor = await fetch_actor(actor_uri) if not actor: logging.info('failed to fetch actor at: %r', actor_uri) return logging.info('unfollowing: %r', actor_uri) message = { "@context": "https://www.w3.org/ns/activitystreams", "type": "Undo", "to": [actor['id']], "object": { "type": "Follow", "object": actor_uri, "actor": actor['id'], "id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4()) }, "id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4()), "actor": "https://{}/actor".format(AP_CONFIG['host']) } await push_message_to_actor(actor, message, "https://{}/actor#main-key".format(AP_CONFIG['host'])) tag_re = re.compile(r'(|<[^>]*>)') def strip_html(data): no_tags = tag_re.sub('', data) return cgi.escape(no_tags) def distill_inboxes(actor, object_id): global DATABASE origin_hostname = urlsplit(object_id).hostname inbox = get_actor_inbox(actor) targets = [] hostnames = [] bi = DATABASE.get('backoff-instances',{}) for target in DATABASE.get('relay-list', []): hostname = urlsplit(target).hostname if target == inbox or hostname == origin_hostname: continue if hostname in bi: # hard pass it's been a day of errors if time.time()-bi[hostname]['ts'] > 86400: continue # timed retries if time.time()-bi[hostname]['ts'] < random.randrange(0, min(3600+bi[hostname]['count']*60,2 ** bi[hostname]['count']))+bi[hostname]['count']*10: continue targets.append(target) hostnames.append(hostname) assert inbox not in targets assert origin_hostname not in hostnames return targets def distill_object_id(activity): logging.debug('>> determining object ID for %r', activity['object']) obj = activity['object'] if isinstance(obj, str): return obj return obj['id'] async def handle_relay(actor, data, request): global CACHE global INBOUND_STATS INBOUND_STATS['processed']+=1 object_id = distill_object_id(data) if object_id in CACHE: logging.debug('>> already relayed %r as %r', object_id, CACHE[object_id]) return activity_id = "https://{}/activities/{}".format(request.host, uuid.uuid4()) message = { "@context": "https://www.w3.org/ns/activitystreams", "type": "Announce", "to": ["https://{}/followers".format(request.host)], "actor": "https://{}/actor".format(request.host), "object": object_id, "id": activity_id } logging.debug('>> relay: %r', message) inboxes = distill_inboxes(actor, object_id) futures = [push_message_to_actor({'inbox': inbox}, message, 'https://{}/actor#main-key'.format(request.host)) for inbox in inboxes] asyncio.ensure_future(asyncio.gather(*futures)) CACHE[object_id] = activity_id async def handle_forward(actor, data, request): object_id = distill_object_id(data) logging.debug('>> Relay %r', data) inboxes = distill_inboxes(actor, object_id) futures = [ push_message_to_actor( {'inbox': inbox}, data, 'https://{}/actor#main-key'.format(request.host)) for inbox in inboxes] asyncio.ensure_future(asyncio.gather(*futures)) async def handle_follow(actor, data, request): global DATABASE following = DATABASE.get('relay-list', []) inbox = get_actor_inbox(actor) if urlsplit(inbox).hostname in AP_CONFIG['blocked_instances']: return not_subd = DATABASE.get('not-subscribed',[]) if urlsplit(inbox).hostname in not_subd: not_subd.remove(urlsplit(inbox).hostname) DATABASE['not-subscribed'] = not_subd if inbox not in following: following += [inbox] DATABASE['relay-list'] = following if data['object'].endswith('/actor'): asyncio.ensure_future(follow_remote_actor(actor['id'])) message = { "@context": "https://www.w3.org/ns/activitystreams", "type": "Accept", "to": [actor["id"]], "actor": "https://{}/actor".format(request.host), # this is wrong per litepub, but mastodon < 2.4 is not compliant with that profile. "object": { "type": "Follow", "id": data["id"], "object": "https://{}/actor".format(request.host), "actor": actor["id"] }, "id": "https://{}/activities/{}".format(request.host, uuid.uuid4()), } asyncio.ensure_future(push_message_to_actor(actor, message, 'https://{}/actor#main-key'.format(request.host))) async def handle_undo(actor, data, request): global DATABASE child = data['object'] if child['type'] == 'Follow': following = DATABASE.get('relay-list', []) inbox = get_actor_inbox(actor) if inbox in following: following.remove(inbox) DATABASE['relay-list'] = following if child['object'].endswith('/actor'): await unfollow_remote_actor(actor['id']) processors = { 'Announce': handle_relay, 'Create': handle_relay, 'Delete': handle_forward, 'Follow': handle_follow, 'Undo': handle_undo, 'Update': handle_forward, } async def inbox(request): data = await request.json() instance = urlsplit(data['actor']).hostname global DATABASE if 'actor' not in data or not request['validated']: logging.info('Actor not in data or request not validated') raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain') elif data['type'] != 'Follow' and 'https://{}/inbox'.format(instance) not in DATABASE['relay-list']: logging.info('Datatype not follow or instance not in relay-list: %r',instance) if data['type'] not in ['Announce','Delete']: logging.info('data: %r',data) if data['type'] in ['Announce','Create']: INBOUND_STATS['rejected']+=1 not_subd = DATABASE.get('not-subscribed',[]) if instance not in not_subd: not_subd.append(instance) DATABASE['not-subscribed'] = not_subd raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain') elif AP_CONFIG['whitelist_enabled'] is True and instance not in AP_CONFIG['whitelist']: raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain') # let's give a try and remove the instance from backoff if it sends us a message bi = DATABASE.get('backoff-instances',{}) if instance in bi: del bi[instance] DATABASE['backoff-instances'] = bi actor = await fetch_actor(data["actor"]) actor_uri = 'https://{}/actor'.format(request.host) logging.debug(">> payload %r", data) processor = processors.get(data['type'], None) if processor: await processor(actor, data, request) return aiohttp.web.Response(body=b'{}', content_type='application/activity+json') app.router.add_post('/inbox', inbox)