backoff instances / stats
This commit is contained in:
parent
d12772dcfb
commit
86f1d75b5e
1 changed files with 75 additions and 13 deletions
|
@ -6,6 +6,8 @@ import uuid
|
|||
import re
|
||||
import simplejson as json
|
||||
import cgi
|
||||
import time
|
||||
import random
|
||||
from urllib.parse import urlsplit
|
||||
from Crypto.PublicKey import RSA
|
||||
from .database import DATABASE
|
||||
|
@ -13,7 +15,6 @@ from .http_debug import http_debug
|
|||
|
||||
from cachetools import LFUCache
|
||||
|
||||
|
||||
# generate actor keys if not present
|
||||
if "actorKeys" not in DATABASE:
|
||||
logging.info("No actor keys present, generating 4096-bit RSA keypair.")
|
||||
|
@ -30,7 +31,6 @@ if "actorKeys" not in DATABASE:
|
|||
PRIVKEY = RSA.importKey(DATABASE["actorKeys"]["privateKey"])
|
||||
PUBKEY = PRIVKEY.publickey()
|
||||
|
||||
|
||||
from . import app, CONFIG
|
||||
from .remote_actor import fetch_actor
|
||||
|
||||
|
@ -38,10 +38,10 @@ from .remote_actor import fetch_actor
|
|||
AP_CONFIG = CONFIG.get('ap', {'host': 'localhost','blocked_instances':[]})
|
||||
CACHE_SIZE = CONFIG.get('cache-size', 16384)
|
||||
|
||||
INBOUND_STATS={'processed':0,'rejected':0}
|
||||
|
||||
CACHE = LFUCache(CACHE_SIZE)
|
||||
|
||||
|
||||
async def actor(request):
|
||||
data = {
|
||||
"@context": "https://www.w3.org/ns/activitystreams",
|
||||
|
@ -77,7 +77,6 @@ get_actor_inbox = lambda actor: actor.get('endpoints', {}).get('sharedInbox', ac
|
|||
|
||||
async def push_message_to_actor(actor, message, our_key_id):
|
||||
inbox = get_actor_inbox(actor)
|
||||
|
||||
url = urlsplit(inbox)
|
||||
|
||||
# XXX: Digest
|
||||
|
@ -93,14 +92,32 @@ async def push_message_to_actor(actor, message, our_key_id):
|
|||
|
||||
logging.debug('%r >> %r', inbox, message)
|
||||
|
||||
global DATABASE
|
||||
try:
|
||||
async with aiohttp.ClientSession(trace_configs=[http_debug()]) as session:
|
||||
async with session.post(inbox, data=data, headers=headers) as resp:
|
||||
if resp.status == 202:
|
||||
# if we get a success let's delete from backoff
|
||||
bi = DATABASE.get('backoff-instances',{})
|
||||
if urlsplit(inbox).hostname in bi:
|
||||
del bi[urlsplit(inbox).hostname]
|
||||
DATABASE['backoff-instances'] = bi
|
||||
return
|
||||
elif resp.status == 200:
|
||||
# if we get a success let's delete from backoff
|
||||
bi = DATABASE.get('backoff-instances',{})
|
||||
if urlsplit(inbox).hostname in bi:
|
||||
del bi[urlsplit(inbox).hostname]
|
||||
DATABASE['backoff-instances'] = bi
|
||||
resp_payload = await resp.text()
|
||||
logging.debug('%r >> resp %r', inbox, resp_payload)
|
||||
except Exception as e:
|
||||
bi = DATABASE.get('backoff-instances',{})
|
||||
if url.hostname not in bi:
|
||||
bi[url.hostname] = {'ts':time.time(),'count':1}
|
||||
else:
|
||||
bi[url.hostname]['count'] +=1
|
||||
DATABASE['backoff-instances'] = bi
|
||||
logging.info('Caught %r while pushing to %r.', e, inbox)
|
||||
|
||||
|
||||
|
@ -124,6 +141,7 @@ async def follow_remote_actor(actor_uri):
|
|||
|
||||
|
||||
async def unfollow_remote_actor(actor_uri):
|
||||
return
|
||||
actor = await fetch_actor(actor_uri)
|
||||
if not actor:
|
||||
logging.info('failed to fetch actor at: %r', actor_uri)
|
||||
|
@ -159,9 +177,23 @@ def distill_inboxes(actor, object_id):
|
|||
origin_hostname = urlsplit(object_id).hostname
|
||||
|
||||
inbox = get_actor_inbox(actor)
|
||||
targets = [target for target in DATABASE.get('relay-list', []) if target != inbox]
|
||||
targets = [target for target in targets if urlsplit(target).hostname != origin_hostname]
|
||||
hostnames = [urlsplit(target).hostname for target in targets]
|
||||
targets = []
|
||||
hostnames = []
|
||||
bi = DATABASE.get('backoff-instances',{})
|
||||
for target in DATABASE.get('relay-list', []):
|
||||
hostname = urlsplit(target).hostname
|
||||
if target == inbox or hostname == origin_hostname:
|
||||
continue
|
||||
if hostname in bi:
|
||||
# hard pass it's been a day of errors
|
||||
if time.time()-bi[hostname]['ts'] > 86400:
|
||||
continue
|
||||
# timed retries
|
||||
if time.time()-bi[hostname]['ts'] < random.randrange(0, min(3600+bi[hostname]['count']*60,2 ** bi[hostname]['count']))+bi[hostname]['count']*10:
|
||||
continue
|
||||
targets.append(target)
|
||||
hostnames.append(hostname)
|
||||
|
||||
|
||||
assert inbox not in targets
|
||||
assert origin_hostname not in hostnames
|
||||
|
@ -181,6 +213,9 @@ def distill_object_id(activity):
|
|||
|
||||
async def handle_relay(actor, data, request):
|
||||
global CACHE
|
||||
global INBOUND_STATS
|
||||
|
||||
INBOUND_STATS['processed']+=1
|
||||
|
||||
object_id = distill_object_id(data)
|
||||
|
||||
|
@ -209,7 +244,7 @@ async def handle_relay(actor, data, request):
|
|||
CACHE[object_id] = activity_id
|
||||
|
||||
|
||||
async def handle_delete(actor, data, request):
|
||||
async def handle_forward(actor, data, request):
|
||||
object_id = distill_object_id(data)
|
||||
|
||||
logging.debug('>> Relay %r', data)
|
||||
|
@ -234,6 +269,11 @@ async def handle_follow(actor, data, request):
|
|||
if urlsplit(inbox).hostname in AP_CONFIG['blocked_instances']:
|
||||
return
|
||||
|
||||
not_subd = DATABASE.get('not-subscribed',[])
|
||||
if urlsplit(inbox).hostname in not_subd:
|
||||
not_subd.remove(urlsplit(inbox).hostname)
|
||||
DATABASE['not-subscribed'] = not_subd
|
||||
|
||||
if inbox not in following:
|
||||
following += [inbox]
|
||||
DATABASE['relay-list'] = following
|
||||
|
@ -281,21 +321,43 @@ async def handle_undo(actor, data, request):
|
|||
processors = {
|
||||
'Announce': handle_relay,
|
||||
'Create': handle_relay,
|
||||
'Delete': handle_delete,
|
||||
'Delete': handle_forward,
|
||||
'Follow': handle_follow,
|
||||
'Undo': handle_undo
|
||||
'Undo': handle_undo,
|
||||
'Update': handle_forward,
|
||||
}
|
||||
|
||||
|
||||
async def inbox(request):
|
||||
data = await request.json()
|
||||
instance = urlsplit(data['actor']).hostname
|
||||
global DATABASE
|
||||
|
||||
if 'actor' not in data or not request['validated']:
|
||||
logging.info('Actor not in data or request not validated')
|
||||
raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
|
||||
|
||||
if instance in AP_CONFIG['blocked_instances'] or instance in DATABASE.get('FAILED_CHECKS',{}):
|
||||
INBOUND_STATS['rejected']+=1
|
||||
logging.info('Blocked instance')
|
||||
raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
|
||||
|
||||
if data['type'] != 'Follow' and 'https://{}/inbox'.format(instance) not in DATABASE['relay-list']:
|
||||
raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
|
||||
logging.info('Datatype not follow or instance not in relay-list: %r',instance)
|
||||
if data['type'] not in ['Announce','Delete']:
|
||||
logging.info('data: %r',data)
|
||||
if data['type'] in ['Announce','Create']:
|
||||
INBOUND_STATS['rejected']+=1
|
||||
not_subd = DATABASE.get('not-subscribed',[])
|
||||
if instance not in not_subd:
|
||||
not_subd.append(instance)
|
||||
DATABASE['not-subscribed'] = not_subd
|
||||
return aiohttp.web.Response(body=b'{}', content_type='application/activity+json')
|
||||
|
||||
# let's give a try and remove the instance from backoff if it sends us a message
|
||||
bi = DATABASE.get('backoff-instances',{})
|
||||
if instance in bi:
|
||||
del bi[instance]
|
||||
DATABASE['backoff-instances'] = bi
|
||||
|
||||
actor = await fetch_actor(data["actor"])
|
||||
actor_uri = 'https://{}/actor'.format(request.host)
|
||||
|
@ -305,7 +367,7 @@ async def inbox(request):
|
|||
processor = processors.get(data['type'], None)
|
||||
if processor:
|
||||
await processor(actor, data, request)
|
||||
|
||||
|
||||
return aiohttp.web.Response(body=b'{}', content_type='application/activity+json')
|
||||
|
||||
|
||||
|
|
Loading…
Reference in a new issue