sedi-relay/relay/actor.py
2019-07-25 19:15:25 +00:00

377 lines
12 KiB
Python

import aiohttp
import aiohttp.web
import asyncio
import logging
import uuid
import re
import simplejson as json
import cgi
import time
import random
from urllib.parse import urlsplit
from Crypto.PublicKey import RSA
from .database import DATABASE
from .http_debug import http_debug
from cachetools import LFUCache
# generate actor keys if not present
if "actorKeys" not in DATABASE:
logging.info("No actor keys present, generating 4096-bit RSA keypair.")
privkey = RSA.generate(4096)
pubkey = privkey.publickey()
DATABASE["actorKeys"] = {
"publicKey": pubkey.exportKey('PEM').decode('utf-8'),
"privateKey": privkey.exportKey('PEM').decode('utf-8')
}
PRIVKEY = RSA.importKey(DATABASE["actorKeys"]["privateKey"])
PUBKEY = PRIVKEY.publickey()
from . import app, CONFIG
from .remote_actor import fetch_actor
AP_CONFIG = CONFIG.get('ap', {'host': 'localhost','blocked_instances':[], 'whitelist_enabled': False, 'whitelist': []})
CACHE_SIZE = CONFIG.get('cache-size', 16384)
INBOUND_STATS={'processed':0,'rejected':0}
CACHE = LFUCache(CACHE_SIZE)
async def actor(request):
data = {
"@context": "https://www.w3.org/ns/activitystreams",
"endpoints": {
"sharedInbox": "https://{}/inbox".format(request.host)
},
"followers": "https://{}/followers".format(request.host),
"following": "https://{}/following".format(request.host),
"inbox": "https://{}/inbox".format(request.host),
"name": "ActivityRelay",
"type": "Application",
"id": "https://{}/actor".format(request.host),
"publicKey": {
"id": "https://{}/actor#main-key".format(request.host),
"owner": "https://{}/actor".format(request.host),
"publicKeyPem": DATABASE["actorKeys"]["publicKey"]
},
"summary": "ActivityRelay bot",
"preferredUsername": "relay",
"url": "https://{}/actor".format(request.host)
}
return aiohttp.web.json_response(data)
app.router.add_get('/actor', actor)
from .http_signatures import sign_headers
get_actor_inbox = lambda actor: actor.get('endpoints', {}).get('sharedInbox', actor['inbox'])
async def push_message_to_actor(actor, message, our_key_id):
inbox = get_actor_inbox(actor)
url = urlsplit(inbox)
# XXX: Digest
data = json.dumps(message)
headers = {
'(request-target)': 'post {}'.format(url.path),
'Content-Length': str(len(data)),
'Content-Type': 'application/activity+json',
'User-Agent': 'ActivityRelay'
}
headers['signature'] = sign_headers(headers, PRIVKEY, our_key_id)
headers.pop('(request-target)')
logging.debug('%r >> %r', inbox, message)
global DATABASE
try:
async with aiohttp.ClientSession(trace_configs=[http_debug()]) as session:
async with session.post(inbox, data=data, headers=headers) as resp:
if resp.status == 202:
# if we get a success let's delete from backoff
bi = DATABASE.get('backoff-instances',{})
if urlsplit(inbox).hostname in bi:
del bi[urlsplit(inbox).hostname]
DATABASE['backoff-instances'] = bi
return
elif resp.status == 200:
# if we get a success let's delete from backoff
bi = DATABASE.get('backoff-instances',{})
if urlsplit(inbox).hostname in bi:
del bi[urlsplit(inbox).hostname]
DATABASE['backoff-instances'] = bi
resp_payload = await resp.text()
logging.debug('%r >> resp %r', inbox, resp_payload)
except Exception as e:
bi = DATABASE.get('backoff-instances',{})
if url.hostname not in bi:
bi[url.hostname] = {'ts':time.time(),'count':1}
else:
bi[url.hostname]['count'] +=1
DATABASE['backoff-instances'] = bi
logging.info('Caught %r while pushing to %r.', e, inbox)
async def follow_remote_actor(actor_uri):
actor = await fetch_actor(actor_uri)
if not actor:
logging.info('failed to fetch actor at: %r', actor_uri)
return
if AP_CONFIG['whitelist_enabled'] is True and urlsplit(actor_uri).hostname not in AP_CONFIG['whitelist']:
logging.info('refusing to follow non-whitelisted actor: %r', actor_uri)
return
logging.info('following: %r', actor_uri)
message = {
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Follow",
"to": [actor['id']],
"object": actor['id'],
"id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4()),
"actor": "https://{}/actor".format(AP_CONFIG['host'])
}
await push_message_to_actor(actor, message, "https://{}/actor#main-key".format(AP_CONFIG['host']))
async def unfollow_remote_actor(actor_uri):
actor = await fetch_actor(actor_uri)
if not actor:
logging.info('failed to fetch actor at: %r', actor_uri)
return
logging.info('unfollowing: %r', actor_uri)
message = {
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Undo",
"to": [actor['id']],
"object": {
"type": "Follow",
"object": actor_uri,
"actor": actor['id'],
"id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4())
},
"id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4()),
"actor": "https://{}/actor".format(AP_CONFIG['host'])
}
await push_message_to_actor(actor, message, "https://{}/actor#main-key".format(AP_CONFIG['host']))
tag_re = re.compile(r'(<!--.*?-->|<[^>]*>)')
def strip_html(data):
no_tags = tag_re.sub('', data)
return cgi.escape(no_tags)
def distill_inboxes(actor, object_id):
global DATABASE
origin_hostname = urlsplit(object_id).hostname
inbox = get_actor_inbox(actor)
targets = []
hostnames = []
bi = DATABASE.get('backoff-instances',{})
for target in DATABASE.get('relay-list', []):
hostname = urlsplit(target).hostname
if target == inbox or hostname == origin_hostname:
continue
if hostname in bi:
# hard pass it's been a day of errors
if time.time()-bi[hostname]['ts'] > 86400:
continue
# timed retries
if time.time()-bi[hostname]['ts'] < random.randrange(0, min(3600+bi[hostname]['count']*60,2 ** bi[hostname]['count']))+bi[hostname]['count']*10:
continue
targets.append(target)
hostnames.append(hostname)
assert inbox not in targets
assert origin_hostname not in hostnames
return targets
def distill_object_id(activity):
logging.debug('>> determining object ID for %r', activity['object'])
obj = activity['object']
if isinstance(obj, str):
return obj
return obj['id']
async def handle_relay(actor, data, request):
global CACHE
global INBOUND_STATS
INBOUND_STATS['processed']+=1
object_id = distill_object_id(data)
if object_id in CACHE:
logging.debug('>> already relayed %r as %r', object_id, CACHE[object_id])
return
activity_id = "https://{}/activities/{}".format(request.host, uuid.uuid4())
message = {
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Announce",
"to": ["https://{}/followers".format(request.host)],
"actor": "https://{}/actor".format(request.host),
"object": object_id,
"id": activity_id
}
logging.debug('>> relay: %r', message)
inboxes = distill_inboxes(actor, object_id)
futures = [push_message_to_actor({'inbox': inbox}, message, 'https://{}/actor#main-key'.format(request.host)) for inbox in inboxes]
asyncio.ensure_future(asyncio.gather(*futures))
CACHE[object_id] = activity_id
async def handle_forward(actor, data, request):
object_id = distill_object_id(data)
logging.debug('>> Relay %r', data)
inboxes = distill_inboxes(actor, object_id)
futures = [
push_message_to_actor(
{'inbox': inbox},
data,
'https://{}/actor#main-key'.format(request.host))
for inbox in inboxes]
asyncio.ensure_future(asyncio.gather(*futures))
async def handle_follow(actor, data, request):
global DATABASE
following = DATABASE.get('relay-list', [])
inbox = get_actor_inbox(actor)
if urlsplit(inbox).hostname in AP_CONFIG['blocked_instances']:
return
not_subd = DATABASE.get('not-subscribed',[])
if urlsplit(inbox).hostname in not_subd:
not_subd.remove(urlsplit(inbox).hostname)
DATABASE['not-subscribed'] = not_subd
if inbox not in following:
following += [inbox]
DATABASE['relay-list'] = following
if data['object'].endswith('/actor'):
asyncio.ensure_future(follow_remote_actor(actor['id']))
message = {
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Accept",
"to": [actor["id"]],
"actor": "https://{}/actor".format(request.host),
# this is wrong per litepub, but mastodon < 2.4 is not compliant with that profile.
"object": {
"type": "Follow",
"id": data["id"],
"object": "https://{}/actor".format(request.host),
"actor": actor["id"]
},
"id": "https://{}/activities/{}".format(request.host, uuid.uuid4()),
}
asyncio.ensure_future(push_message_to_actor(actor, message, 'https://{}/actor#main-key'.format(request.host)))
async def handle_undo(actor, data, request):
global DATABASE
child = data['object']
if child['type'] == 'Follow':
following = DATABASE.get('relay-list', [])
inbox = get_actor_inbox(actor)
if inbox in following:
following.remove(inbox)
DATABASE['relay-list'] = following
if child['object'].endswith('/actor'):
await unfollow_remote_actor(actor['id'])
processors = {
'Announce': handle_relay,
'Create': handle_relay,
'Delete': handle_forward,
'Follow': handle_follow,
'Undo': handle_undo,
'Update': handle_forward,
}
async def inbox(request):
data = await request.json()
instance = urlsplit(data['actor']).hostname
global DATABASE
if 'actor' not in data or not request['validated']:
logging.info('Actor not in data or request not validated')
raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
elif data['type'] != 'Follow' and 'https://{}/inbox'.format(instance) not in DATABASE['relay-list']:
logging.info('Datatype not follow or instance not in relay-list: %r',instance)
if data['type'] not in ['Announce','Delete']:
logging.info('data: %r',data)
if data['type'] in ['Announce','Create']:
INBOUND_STATS['rejected']+=1
not_subd = DATABASE.get('not-subscribed',[])
if instance not in not_subd:
not_subd.append(instance)
DATABASE['not-subscribed'] = not_subd
raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
elif AP_CONFIG['whitelist_enabled'] is True and instance not in AP_CONFIG['whitelist']:
raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
# let's give a try and remove the instance from backoff if it sends us a message
bi = DATABASE.get('backoff-instances',{})
if instance in bi:
del bi[instance]
DATABASE['backoff-instances'] = bi
actor = await fetch_actor(data["actor"])
actor_uri = 'https://{}/actor'.format(request.host)
logging.debug(">> payload %r", data)
processor = processors.get(data['type'], None)
if processor:
await processor(actor, data, request)
return aiohttp.web.Response(body=b'{}', content_type='application/activity+json')
app.router.add_post('/inbox', inbox)