diff --git a/relay/actor.py b/relay/actor.py index b2117fd..74d34ec 100644 --- a/relay/actor.py +++ b/relay/actor.py @@ -6,6 +6,8 @@ import uuid import re import simplejson as json import cgi +import time +import random from urllib.parse import urlsplit from Crypto.PublicKey import RSA from .database import DATABASE @@ -13,7 +15,6 @@ from .http_debug import http_debug from cachetools import LFUCache - # generate actor keys if not present if "actorKeys" not in DATABASE: logging.info("No actor keys present, generating 4096-bit RSA keypair.") @@ -30,7 +31,6 @@ if "actorKeys" not in DATABASE: PRIVKEY = RSA.importKey(DATABASE["actorKeys"]["privateKey"]) PUBKEY = PRIVKEY.publickey() - from . import app, CONFIG from .remote_actor import fetch_actor @@ -38,10 +38,10 @@ from .remote_actor import fetch_actor AP_CONFIG = CONFIG.get('ap', {'host': 'localhost','blocked_instances':[]}) CACHE_SIZE = CONFIG.get('cache-size', 16384) +INBOUND_STATS={'processed':0,'rejected':0} CACHE = LFUCache(CACHE_SIZE) - async def actor(request): data = { "@context": "https://www.w3.org/ns/activitystreams", @@ -77,7 +77,6 @@ get_actor_inbox = lambda actor: actor.get('endpoints', {}).get('sharedInbox', ac async def push_message_to_actor(actor, message, our_key_id): inbox = get_actor_inbox(actor) - url = urlsplit(inbox) # XXX: Digest @@ -93,14 +92,32 @@ async def push_message_to_actor(actor, message, our_key_id): logging.debug('%r >> %r', inbox, message) + global DATABASE try: async with aiohttp.ClientSession(trace_configs=[http_debug()]) as session: async with session.post(inbox, data=data, headers=headers) as resp: if resp.status == 202: + # if we get a success let's delete from backoff + bi = DATABASE.get('backoff-instances',{}) + if urlsplit(inbox).hostname in bi: + del bi[urlsplit(inbox).hostname] + DATABASE['backoff-instances'] = bi return + elif resp.status == 200: + # if we get a success let's delete from backoff + bi = DATABASE.get('backoff-instances',{}) + if urlsplit(inbox).hostname in bi: + del bi[urlsplit(inbox).hostname] + DATABASE['backoff-instances'] = bi resp_payload = await resp.text() logging.debug('%r >> resp %r', inbox, resp_payload) except Exception as e: + bi = DATABASE.get('backoff-instances',{}) + if url.hostname not in bi: + bi[url.hostname] = {'ts':time.time(),'count':1} + else: + bi[url.hostname]['count'] +=1 + DATABASE['backoff-instances'] = bi logging.info('Caught %r while pushing to %r.', e, inbox) @@ -124,6 +141,7 @@ async def follow_remote_actor(actor_uri): async def unfollow_remote_actor(actor_uri): + return actor = await fetch_actor(actor_uri) if not actor: logging.info('failed to fetch actor at: %r', actor_uri) @@ -159,9 +177,23 @@ def distill_inboxes(actor, object_id): origin_hostname = urlsplit(object_id).hostname inbox = get_actor_inbox(actor) - targets = [target for target in DATABASE.get('relay-list', []) if target != inbox] - targets = [target for target in targets if urlsplit(target).hostname != origin_hostname] - hostnames = [urlsplit(target).hostname for target in targets] + targets = [] + hostnames = [] + bi = DATABASE.get('backoff-instances',{}) + for target in DATABASE.get('relay-list', []): + hostname = urlsplit(target).hostname + if target == inbox or hostname == origin_hostname: + continue + if hostname in bi: + # hard pass it's been a day of errors + if time.time()-bi[hostname]['ts'] > 86400: + continue + # timed retries + if time.time()-bi[hostname]['ts'] < random.randrange(0, min(3600+bi[hostname]['count']*60,2 ** bi[hostname]['count']))+bi[hostname]['count']*10: + continue + targets.append(target) + hostnames.append(hostname) + assert inbox not in targets assert origin_hostname not in hostnames @@ -181,6 +213,9 @@ def distill_object_id(activity): async def handle_relay(actor, data, request): global CACHE + global INBOUND_STATS + + INBOUND_STATS['processed']+=1 object_id = distill_object_id(data) @@ -209,7 +244,7 @@ async def handle_relay(actor, data, request): CACHE[object_id] = activity_id -async def handle_delete(actor, data, request): +async def handle_forward(actor, data, request): object_id = distill_object_id(data) logging.debug('>> Relay %r', data) @@ -234,6 +269,11 @@ async def handle_follow(actor, data, request): if urlsplit(inbox).hostname in AP_CONFIG['blocked_instances']: return + not_subd = DATABASE.get('not-subscribed',[]) + if urlsplit(inbox).hostname in not_subd: + not_subd.remove(urlsplit(inbox).hostname) + DATABASE['not-subscribed'] = not_subd + if inbox not in following: following += [inbox] DATABASE['relay-list'] = following @@ -281,21 +321,43 @@ async def handle_undo(actor, data, request): processors = { 'Announce': handle_relay, 'Create': handle_relay, - 'Delete': handle_delete, + 'Delete': handle_forward, 'Follow': handle_follow, - 'Undo': handle_undo + 'Undo': handle_undo, + 'Update': handle_forward, } - async def inbox(request): data = await request.json() instance = urlsplit(data['actor']).hostname + global DATABASE if 'actor' not in data or not request['validated']: + logging.info('Actor not in data or request not validated') + raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain') + + if instance in AP_CONFIG['blocked_instances'] or instance in DATABASE.get('FAILED_CHECKS',{}): + INBOUND_STATS['rejected']+=1 + logging.info('Blocked instance') raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain') if data['type'] != 'Follow' and 'https://{}/inbox'.format(instance) not in DATABASE['relay-list']: - raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain') + logging.info('Datatype not follow or instance not in relay-list: %r',instance) + if data['type'] not in ['Announce','Delete']: + logging.info('data: %r',data) + if data['type'] in ['Announce','Create']: + INBOUND_STATS['rejected']+=1 + not_subd = DATABASE.get('not-subscribed',[]) + if instance not in not_subd: + not_subd.append(instance) + DATABASE['not-subscribed'] = not_subd + return aiohttp.web.Response(body=b'{}', content_type='application/activity+json') + + # let's give a try and remove the instance from backoff if it sends us a message + bi = DATABASE.get('backoff-instances',{}) + if instance in bi: + del bi[instance] + DATABASE['backoff-instances'] = bi actor = await fetch_actor(data["actor"]) actor_uri = 'https://{}/actor'.format(request.host) @@ -305,7 +367,7 @@ async def inbox(request): processor = processors.get(data['type'], None) if processor: await processor(actor, data, request) - + return aiohttp.web.Response(body=b'{}', content_type='application/activity+json')