Merge branch 'dev' into 'master'

Version 0.2.4

See merge request pleroma/relay!46
This commit is contained in:
Izalia Mae 2022-12-11 00:01:17 +00:00
commit bbdc151ed3
16 changed files with 791 additions and 688 deletions

View file

@ -24,6 +24,27 @@ Run the setup wizard to configure your relay.
activityrelay setup activityrelay setup
## Config
Manage the relay config
activityrelay config
### List
List the current config key/value pairs
activityrelay config list
### Set
Set a value for a config option
activityrelay config set <key> <value>
## Inbox ## Inbox
Manage the list of subscribed instances. Manage the list of subscribed instances.
@ -97,6 +118,13 @@ Remove a domain from the whitelist.
activityrelay whitelist remove <domain> activityrelay whitelist remove <domain>
### Import
Add all current inboxes to the whitelist
activityrelay whitelist import
## Instance ## Instance
Manage the instance ban list. Manage the instance ban list.

View file

@ -1,6 +1,8 @@
# Configuration # Configuration
## DB ## General
### DB
The path to the database. It contains the relay actor private key and all subscribed The path to the database. It contains the relay actor private key and all subscribed
instances. If the path is not absolute, it is relative to the working directory. instances. If the path is not absolute, it is relative to the working directory.
@ -8,7 +10,7 @@ instances. If the path is not absolute, it is relative to the working directory.
db: relay.jsonld db: relay.jsonld
## Listener ### Listener
The address and port the relay will listen on. If the reverse proxy (nginx, apache, caddy, etc) The address and port the relay will listen on. If the reverse proxy (nginx, apache, caddy, etc)
is running on the same host, it is recommended to change `listen` to `localhost` is running on the same host, it is recommended to change `listen` to `localhost`
@ -17,22 +19,41 @@ is running on the same host, it is recommended to change `listen` to `localhost`
port: 8080 port: 8080
## Note ### Note
A small blurb to describe your relay instance. This will show up on the relay's home page. A small blurb to describe your relay instance. This will show up on the relay's home page.
note: "Make a note about your instance here." note: "Make a note about your instance here."
## Post Limit ### Post Limit
The maximum number of messages to send out at once. For each incoming message, a message will be The maximum number of messages to send out at once. For each incoming message, a message will be
sent out to every subscribed instance minus the instance which sent the message. This limit sent out to every subscribed instance minus the instance which sent the message. This limit
is to prevent too many outgoing connections from being made, so adjust if necessary. is to prevent too many outgoing connections from being made, so adjust if necessary.
Note: If the `workers` option is set to anything above 0, this limit will be per worker.
push_limit: 512 push_limit: 512
### Push Workers
The relay can be configured to use threads to push messages out. For smaller relays, this isn't
necessary, but bigger ones (>100 instances) will want to set this to the number of available cpu
threads.
workers: 0
### JSON GET cache limit
JSON objects (actors, nodeinfo, etc) will get cached when fetched. This will set the max number of
objects to keep in the cache.
json_cache: 1024
## AP ## AP
Various ActivityPub-related settings Various ActivityPub-related settings
@ -82,29 +103,3 @@ setting this to the below list will block all other relays and prevent relay cha
- aoderelay - aoderelay
- social.seattle.wa.us-relay - social.seattle.wa.us-relay
- unciarelay - unciarelay
## Cache
These are object limits for various caches. Only change if you know what you're doing.
### Objects
The urls of messages which have been processed by the relay.
objects: 1024
### Actors
The ActivityPub actors of incoming messages.
actors: 1024
### Actors
The base64 encoded hashes of messages.
digests: 1024

View file

@ -15,7 +15,7 @@ the [official pipx docs](https://pypa.github.io/pipx/installation/) for more in-
Now simply install ActivityRelay directly from git Now simply install ActivityRelay directly from git
pipx install git+https://git.pleroma.social/pleroma/relay@0.2.3 pipx install git+https://git.pleroma.social/pleroma/relay@0.2.4
Or from a cloned git repo. Or from a cloned git repo.
@ -39,7 +39,7 @@ be installed via [pyenv](https://github.com/pyenv/pyenv).
The instructions for installation via pip are very similar to pipx. Installation can be done from The instructions for installation via pip are very similar to pipx. Installation can be done from
git git
python3 -m pip install git+https://git.pleroma.social/pleroma/relay@0.2.3 python3 -m pip install git+https://git.pleroma.social/pleroma/relay@0.2.4
or a cloned git repo. or a cloned git repo.

View file

@ -9,13 +9,19 @@ port: 8080
# Note # Note
note: "Make a note about your instance here." note: "Make a note about your instance here."
# maximum number of inbox posts to do at once # Number of worker threads to start. If 0, use asyncio futures instead of threads.
workers: 0
# Maximum number of inbox posts to do at once
# If workers is set to 1 or above, this is the max for each worker
push_limit: 512 push_limit: 512
# this section is for ActivityPub # The amount of json objects to cache from GET requests
json_cache: 1024
ap: ap:
# this is used for generating activitypub messages, as well as instructions for # This is used for generating activitypub messages, as well as instructions for
# linking AP identities. it should be an SSL-enabled domain reachable by https. # linking AP identities. It should be an SSL-enabled domain reachable by https.
host: 'relay.example.com' host: 'relay.example.com'
blocked_instances: blocked_instances:
@ -35,9 +41,3 @@ ap:
#- 'aoderelay' #- 'aoderelay'
#- 'social.seattle.wa.us-relay' #- 'social.seattle.wa.us-relay'
#- 'unciarelay' #- 'unciarelay'
# cache limits as number of items. only change this if you know what you're doing
cache:
objects: 1024
json: 1024
digests: 1024

View file

@ -1,3 +1,3 @@
__version__ = '0.2.3' __version__ = '0.2.4'
from . import logger from . import logger

View file

@ -1,14 +1,17 @@
import asyncio import asyncio
import logging import logging
import os import os
import queue
import signal import signal
import threading
import traceback
from aiohttp import web from aiohttp import web
from cachetools import LRUCache
from datetime import datetime, timedelta from datetime import datetime, timedelta
from .config import RelayConfig from .config import RelayConfig
from .database import RelayDatabase from .database import RelayDatabase
from .http_client import HttpClient
from .misc import DotDict, check_open_port, set_app from .misc import DotDict, check_open_port, set_app
from .views import routes from .views import routes
@ -19,25 +22,39 @@ class Application(web.Application):
self['starttime'] = None self['starttime'] = None
self['running'] = False self['running'] = False
self['is_docker'] = bool(os.environ.get('DOCKER_RUNNING')) self['config'] = RelayConfig(cfgpath)
self['config'] = RelayConfig(cfgpath, self['is_docker'])
if not self['config'].load(): if not self['config'].load():
self['config'].save() self['config'].save()
if self.config.is_docker:
self.config.update({
'db': '/data/relay.jsonld',
'listen': '0.0.0.0',
'port': 8080
})
self['workers'] = []
self['last_worker'] = 0
set_app(self)
self['database'] = RelayDatabase(self['config']) self['database'] = RelayDatabase(self['config'])
self['database'].load() self['database'].load()
self['cache'] = DotDict({key: Cache(maxsize=self['config'][key]) for key in self['config'].cachekeys}) self['client'] = HttpClient(
self['semaphore'] = asyncio.Semaphore(self['config'].push_limit) database = self.database,
limit = self.config.push_limit,
timeout = self.config.timeout,
cache_size = self.config.json_cache
)
self.set_signal_handler() self.set_signal_handler()
set_app(self)
@property @property
def cache(self): def client(self):
return self['cache'] return self['client']
@property @property
@ -50,16 +67,6 @@ class Application(web.Application):
return self['database'] return self['database']
@property
def is_docker(self):
return self['is_docker']
@property
def semaphore(self):
return self['semaphore']
@property @property
def uptime(self): def uptime(self):
if not self['starttime']: if not self['starttime']:
@ -70,6 +77,19 @@ class Application(web.Application):
return timedelta(seconds=uptime.seconds) return timedelta(seconds=uptime.seconds)
def push_message(self, inbox, message):
if self.config.workers <= 0:
return asyncio.ensure_future(self.client.post(inbox, message))
worker = self['workers'][self['last_worker']]
worker.queue.put((inbox, message))
self['last_worker'] += 1
if self['last_worker'] >= len(self['workers']):
self['last_worker'] = 0
def set_signal_handler(self): def set_signal_handler(self):
for sig in {'SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGTERM'}: for sig in {'SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGTERM'}:
try: try:
@ -85,9 +105,6 @@ class Application(web.Application):
return logging.error(f'A server is already running on port {self.config.port}') return logging.error(f'A server is already running on port {self.config.port}')
for route in routes: for route in routes:
if route[1] == '/stats' and logging.DEBUG < logging.root.level:
continue
self.router.add_route(*route) self.router.add_route(*route)
logging.info(f'Starting webserver at {self.config.host} ({self.config.listen}:{self.config.port})') logging.info(f'Starting webserver at {self.config.host} ({self.config.listen}:{self.config.port})')
@ -101,6 +118,13 @@ class Application(web.Application):
async def handle_run(self): async def handle_run(self):
self['running'] = True self['running'] = True
if self.config.workers > 0:
for i in range(self.config.workers):
worker = PushWorker(self)
worker.start()
self['workers'].append(worker)
runner = web.AppRunner(self, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{User-Agent}i"') runner = web.AppRunner(self, access_log_format='%{X-Forwarded-For}i "%r" %s %b "%{User-Agent}i"')
await runner.setup() await runner.setup()
@ -120,8 +144,73 @@ class Application(web.Application):
self['starttime'] = None self['starttime'] = None
self['running'] = False self['running'] = False
self['workers'].clear()
class Cache(LRUCache): class PushWorker(threading.Thread):
def set_maxsize(self, value): def __init__(self, app):
self.__maxsize = int(value) threading.Thread.__init__(self)
self.app = app
self.queue = queue.Queue()
def run(self):
self.client = HttpClient(
database = self.app.database,
limit = self.app.config.push_limit,
timeout = self.app.config.timeout,
cache_size = self.app.config.json_cache
)
asyncio.run(self.handle_queue())
async def handle_queue(self):
while self.app['running']:
try:
inbox, message = self.queue.get(block=True, timeout=0.25)
self.queue.task_done()
logging.verbose(f'New push from Thread-{threading.get_ident()}')
await self.client.post(inbox, message)
except queue.Empty:
pass
## make sure an exception doesn't bring down the worker
except Exception:
traceback.print_exc()
await self.client.close()
## Can't sub-class web.Request, so let's just add some properties
def request_actor(self):
try: return self['actor']
except KeyError: pass
def request_instance(self):
try: return self['instance']
except KeyError: pass
def request_message(self):
try: return self['message']
except KeyError: pass
def request_signature(self):
if 'signature' not in self._state:
try: self['signature'] = DotDict.new_from_signature(self.headers['signature'])
except KeyError: return
return self['signature']
setattr(web.Request, 'actor', property(request_actor))
setattr(web.Request, 'instance', property(request_instance))
setattr(web.Request, 'message', property(request_message))
setattr(web.Request, 'signature', property(request_signature))
setattr(web.Request, 'config', property(lambda self: self.app.config))
setattr(web.Request, 'database', property(lambda self: self.app.database))

View file

@ -1,60 +1,51 @@
import json import json
import os
import yaml import yaml
from functools import cached_property
from pathlib import Path from pathlib import Path
from urllib.parse import urlparse from urllib.parse import urlparse
from .misc import DotDict from .misc import DotDict, boolean
relay_software_names = [ RELAY_SOFTWARE = [
'activityrelay', 'activityrelay', # https://git.pleroma.social/pleroma/relay
'aoderelay', 'aoderelay', # https://git.asonix.dog/asonix/relay
'social.seattle.wa.us-relay', 'feditools-relay' # https://git.ptzo.gdn/feditools/relay
'unciarelay' ]
APKEYS = [
'host',
'whitelist_enabled',
'blocked_software',
'blocked_instances',
'whitelist'
] ]
class RelayConfig(DotDict): class RelayConfig(DotDict):
apkeys = { def __init__(self, path):
'host',
'whitelist_enabled',
'blocked_software',
'blocked_instances',
'whitelist'
}
cachekeys = {
'json',
'objects',
'digests'
}
def __init__(self, path, is_docker):
DotDict.__init__(self, {}) DotDict.__init__(self, {})
if is_docker: if self.is_docker:
path = '/data/relay.yaml' path = '/data/config.yaml'
self._isdocker = is_docker
self._path = Path(path).expanduser() self._path = Path(path).expanduser()
self.reset() self.reset()
def __setitem__(self, key, value): def __setitem__(self, key, value):
if self._isdocker and key in ['db', 'listen', 'port']:
return
if key in ['blocked_instances', 'blocked_software', 'whitelist']: if key in ['blocked_instances', 'blocked_software', 'whitelist']:
assert isinstance(value, (list, set, tuple)) assert isinstance(value, (list, set, tuple))
elif key in ['port', 'json', 'objects', 'digests']: elif key in ['port', 'workers', 'json_cache', 'timeout']:
assert isinstance(value, (int)) if not isinstance(value, int):
value = int(value)
elif key == 'whitelist_enabled': elif key == 'whitelist_enabled':
assert isinstance(value, bool) if not isinstance(value, bool):
value = boolean(value)
super().__setitem__(key, value) super().__setitem__(key, value)
@ -84,6 +75,11 @@ class RelayConfig(DotDict):
return f'{self.actor}#main-key' return f'{self.actor}#main-key'
@cached_property
def is_docker(self):
return bool(os.environ.get('DOCKER_RUNNING'))
def reset(self): def reset(self):
self.clear() self.clear()
self.update({ self.update({
@ -92,16 +88,17 @@ class RelayConfig(DotDict):
'port': 8080, 'port': 8080,
'note': 'Make a note about your instance here.', 'note': 'Make a note about your instance here.',
'push_limit': 512, 'push_limit': 512,
'json_cache': 1024,
'timeout': 10,
'workers': 0,
'host': 'relay.example.com', 'host': 'relay.example.com',
'whitelist_enabled': False,
'blocked_software': [], 'blocked_software': [],
'blocked_instances': [], 'blocked_instances': [],
'whitelist': [], 'whitelist': []
'whitelist_enabled': False,
'json': 1024,
'objects': 1024,
'digests': 1024
}) })
def ban_instance(self, instance): def ban_instance(self, instance):
if instance.startswith('http'): if instance.startswith('http'):
instance = urlparse(instance).hostname instance = urlparse(instance).hostname
@ -208,13 +205,15 @@ class RelayConfig(DotDict):
return False return False
for key, value in config.items(): for key, value in config.items():
if key in ['ap', 'cache']: if key in ['ap']:
for k, v in value.items(): for k, v in value.items():
if k not in self: if k not in self:
continue continue
self[k] = v self[k] = v
continue
elif key not in self: elif key not in self:
continue continue
@ -228,13 +227,16 @@ class RelayConfig(DotDict):
def save(self): def save(self):
config = { config = {
'db': self['db'], # just turning config.db into a string is good enough for now
'db': str(self.db),
'listen': self.listen, 'listen': self.listen,
'port': self.port, 'port': self.port,
'note': self.note, 'note': self.note,
'push_limit': self.push_limit, 'push_limit': self.push_limit,
'ap': {key: self[key] for key in self.apkeys}, 'workers': self.workers,
'cache': {key: self[key] for key in self.cachekeys} 'json_cache': self.json_cache,
'timeout': self.timeout,
'ap': {key: self[key] for key in APKEYS}
} }
with open(self._path, 'w') as fd: with open(self._path, 'w') as fd:

View file

@ -1,8 +1,9 @@
import aputils
import asyncio
import json import json
import logging import logging
import traceback import traceback
from Crypto.PublicKey import RSA
from urllib.parse import urlparse from urllib.parse import urlparse
@ -16,22 +17,7 @@ class RelayDatabase(dict):
}) })
self.config = config self.config = config
self.PRIVKEY = None self.signer = None
@property
def PUBKEY(self):
return self.PRIVKEY.publickey()
@property
def pubkey(self):
return self.PUBKEY.exportKey('PEM').decode('utf-8')
@property
def privkey(self):
return self['private-key']
@property @property
@ -44,11 +30,6 @@ class RelayDatabase(dict):
return tuple(data['inbox'] for data in self['relay-list'].values()) return tuple(data['inbox'] for data in self['relay-list'].values())
def generate_key(self):
self.PRIVKEY = RSA.generate(4096)
self['private-key'] = self.PRIVKEY.exportKey('PEM').decode('utf-8')
def load(self): def load(self):
new_db = True new_db = True
@ -68,6 +49,7 @@ class RelayDatabase(dict):
for item in data.get('relay-list', []): for item in data.get('relay-list', []):
domain = urlparse(item).hostname domain = urlparse(item).hostname
self['relay-list'][domain] = { self['relay-list'][domain] = {
'domain': domain,
'inbox': item, 'inbox': item,
'followid': None 'followid': None
} }
@ -75,9 +57,13 @@ class RelayDatabase(dict):
else: else:
self['relay-list'] = data.get('relay-list', {}) self['relay-list'] = data.get('relay-list', {})
for domain in self['relay-list'].keys(): for domain, instance in self['relay-list'].items():
if self.config.is_banned(domain) or (self.config.whitelist_enabled and not self.config.is_whitelisted(domain)): if self.config.is_banned(domain) or (self.config.whitelist_enabled and not self.config.is_whitelisted(domain)):
self.del_inbox(domain) self.del_inbox(domain)
continue
if not instance.get('domain'):
instance['domain'] = domain
new_db = False new_db = False
@ -88,12 +74,13 @@ class RelayDatabase(dict):
if self.config.db.stat().st_size > 0: if self.config.db.stat().st_size > 0:
raise e from None raise e from None
if not self.privkey: if not self['private-key']:
logging.info("No actor keys present, generating 4096-bit RSA keypair.") logging.info("No actor keys present, generating 4096-bit RSA keypair.")
self.generate_key() self.signer = aputils.Signer.new(self.config.keyid, size=4096)
self['private-key'] = self.signer.export()
else: else:
self.PRIVKEY = RSA.importKey(self.privkey) self.signer = aputils.Signer(self['private-key'], self.config.keyid)
self.save() self.save()
return not new_db return not new_db
@ -108,29 +95,34 @@ class RelayDatabase(dict):
if domain.startswith('http'): if domain.startswith('http'):
domain = urlparse(domain).hostname domain = urlparse(domain).hostname
if domain not in self['relay-list']: inbox = self['relay-list'].get(domain)
if fail:
raise KeyError(domain)
return if inbox:
return inbox
return self['relay-list'][domain] if fail:
raise KeyError(domain)
def add_inbox(self, inbox, followid=None, fail=False): def add_inbox(self, inbox, followid=None, software=None):
assert inbox.startswith('https'), 'Inbox must be a url' assert inbox.startswith('https'), 'Inbox must be a url'
domain = urlparse(inbox).hostname domain = urlparse(inbox).hostname
instance = self.get_inbox(domain)
if self.get_inbox(domain): if instance:
if fail: if followid:
raise KeyError(domain) instance['followid'] = followid
return False if software:
instance['software'] = software
return instance
self['relay-list'][domain] = { self['relay-list'][domain] = {
'domain': domain, 'domain': domain,
'inbox': inbox, 'inbox': inbox,
'followid': followid 'followid': followid,
'software': software
} }
logging.verbose(f'Added inbox to database: {inbox}') logging.verbose(f'Added inbox to database: {inbox}')
@ -158,11 +150,6 @@ class RelayDatabase(dict):
return False return False
def set_followid(self, domain, followid):
data = self.get_inbox(domain, fail=True)
data['followid'] = followid
def get_request(self, domain, fail=True): def get_request(self, domain, fail=True):
if domain.startswith('http'): if domain.startswith('http'):
domain = urlparse(domain).hostname domain = urlparse(domain).hostname
@ -197,3 +184,14 @@ class RelayDatabase(dict):
domain = urlparse(inbox).hostname domain = urlparse(inbox).hostname
del self['follow-requests'][domain] del self['follow-requests'][domain]
def distill_inboxes(self, message):
src_domains = {
message.domain,
urlparse(message.objectid).netloc
}
for domain, instance in self['relay-list'].items():
if domain not in src_domains:
yield instance['inbox']

192
relay/http_client.py Normal file
View file

@ -0,0 +1,192 @@
import logging
import traceback
from aiohttp import ClientSession, ClientTimeout, TCPConnector
from aiohttp.client_exceptions import ClientConnectorError, ServerTimeoutError
from aputils import Nodeinfo, WellKnownNodeinfo
from datetime import datetime
from cachetools import LRUCache
from json.decoder import JSONDecodeError
from urllib.parse import urlparse
from . import __version__
from .misc import (
MIMETYPES,
DotDict,
Message
)
HEADERS = {
'Accept': f'{MIMETYPES["activity"]}, {MIMETYPES["json"]};q=0.9',
'User-Agent': f'ActivityRelay/{__version__}'
}
class Cache(LRUCache):
def set_maxsize(self, value):
self.__maxsize = int(value)
class HttpClient:
def __init__(self, database, limit=100, timeout=10, cache_size=1024):
self.database = database
self.cache = Cache(cache_size)
self.cfg = {'limit': limit, 'timeout': timeout}
self._conn = None
self._session = None
@property
def limit(self):
return self.cfg['limit']
@property
def timeout(self):
return self.cfg['timeout']
async def open(self):
if self._session:
return
self._conn = TCPConnector(
limit = self.limit,
ttl_dns_cache = 300,
)
self._session = ClientSession(
connector = self._conn,
headers = HEADERS,
connector_owner = True,
timeout = ClientTimeout(total=self.timeout)
)
async def close(self):
if not self._session:
return
await self._session.close()
await self._conn.close()
self._conn = None
self._session = None
async def get(self, url, sign_headers=False, loads=None, force=False):
await self.open()
try: url, _ = url.split('#', 1)
except: pass
if not force and url in self.cache:
return self.cache[url]
headers = {}
if sign_headers:
headers.update(self.database.signer.sign_headers('GET', url, algorithm='original'))
try:
logging.verbose(f'Fetching resource: {url}')
async with self._session.get(url, headers=headers) as resp:
## Not expecting a response with 202s, so just return
if resp.status == 202:
return
elif resp.status != 200:
logging.verbose(f'Received error when requesting {url}: {resp.status}')
logging.verbose(await resp.read()) # change this to debug
return
if loads:
message = await resp.json(loads=loads)
elif resp.content_type == MIMETYPES['activity']:
message = await resp.json(loads=Message.new_from_json)
elif resp.content_type == MIMETYPES['json']:
message = await resp.json(loads=DotDict.new_from_json)
else:
# todo: raise TypeError or something
logging.verbose(f'Invalid Content-Type for "{url}": {resp.content_type}')
return logging.debug(f'Response: {resp.read()}')
logging.debug(f'{url} >> resp {message.to_json(4)}')
self.cache[url] = message
return message
except JSONDecodeError:
logging.verbose(f'Failed to parse JSON')
except (ClientConnectorError, ServerTimeoutError):
logging.verbose(f'Failed to connect to {urlparse(url).netloc}')
except Exception as e:
traceback.print_exc()
raise e
async def post(self, url, message):
await self.open()
instance = self.database.get_inbox(url)
## Using the old algo by default is probably a better idea right now
if instance and instance.get('software') in {'mastodon'}:
algorithm = 'hs2019'
else:
algorithm = 'original'
headers = {'Content-Type': 'application/activity+json'}
headers.update(self.database.signer.sign_headers('POST', url, message, algorithm=algorithm))
try:
logging.verbose(f'Sending "{message.type}" to {url}')
async with self._session.post(url, headers=headers, data=message.to_json()) as resp:
## Not expecting a response, so just return
if resp.status in {200, 202}:
return logging.verbose(f'Successfully sent "{message.type}" to {url}')
logging.verbose(f'Received error when pushing to {url}: {resp.status}')
return logging.verbose(await resp.read()) # change this to debug
except (ClientConnectorError, ServerTimeoutError):
logging.verbose(f'Failed to connect to {url}')
## prevent workers from being brought down
except Exception as e:
traceback.print_exc()
## Additional methods ##
async def fetch_nodeinfo(self, domain):
nodeinfo_url = None
wk_nodeinfo = await self.get(
f'https://{domain}/.well-known/nodeinfo',
loads = WellKnownNodeinfo.new_from_json
)
if not wk_nodeinfo:
logging.verbose(f'Failed to fetch well-known nodeinfo url for domain: {domain}')
return False
for version in ['20', '21']:
try:
nodeinfo_url = wk_nodeinfo.get_url(version)
except KeyError:
pass
if not nodeinfo_url:
logging.verbose(f'Failed to fetch nodeinfo url for domain: {domain}')
return False
return await self.get(nodeinfo_url, loads=Nodeinfo.new_from_json) or False

View file

@ -1,68 +0,0 @@
import logging
import aiohttp
from collections import defaultdict
STATS = {
'requests': defaultdict(int),
'response_codes': defaultdict(int),
'response_codes_per_domain': defaultdict(lambda: defaultdict(int)),
'delivery_codes': defaultdict(int),
'delivery_codes_per_domain': defaultdict(lambda: defaultdict(int)),
'exceptions': defaultdict(int),
'exceptions_per_domain': defaultdict(lambda: defaultdict(int)),
'delivery_exceptions': defaultdict(int),
'delivery_exceptions_per_domain': defaultdict(lambda: defaultdict(int))
}
async def on_request_start(session, trace_config_ctx, params):
global STATS
logging.debug("HTTP START [%r], [%r]", session, params)
STATS['requests'][params.url.host] += 1
async def on_request_end(session, trace_config_ctx, params):
global STATS
logging.debug("HTTP END [%r], [%r]", session, params)
host = params.url.host
status = params.response.status
STATS['response_codes'][status] += 1
STATS['response_codes_per_domain'][host][status] += 1
if params.method == 'POST':
STATS['delivery_codes'][status] += 1
STATS['delivery_codes_per_domain'][host][status] += 1
async def on_request_exception(session, trace_config_ctx, params):
global STATS
logging.debug("HTTP EXCEPTION [%r], [%r]", session, params)
host = params.url.host
exception = repr(params.exception)
STATS['exceptions'][exception] += 1
STATS['exceptions_per_domain'][host][exception] += 1
if params.method == 'POST':
STATS['delivery_exceptions'][exception] += 1
STATS['delivery_exceptions_per_domain'][host][exception] += 1
def http_debug():
if logging.DEBUG >= logging.root.level:
return
trace_config = aiohttp.TraceConfig()
trace_config.on_request_start.append(on_request_start)
trace_config.on_request_end.append(on_request_end)
trace_config.on_request_exception.append(on_request_exception)
return [trace_config]

View file

@ -8,10 +8,11 @@ from urllib.parse import urlparse
from . import misc, __version__ from . import misc, __version__
from .application import Application from .application import Application
from .config import relay_software_names from .config import RELAY_SOFTWARE
app = None app = None
CONFIG_IGNORE = {'blocked_software', 'blocked_instances', 'whitelist'}
@click.group('cli', context_settings={'show_default': True}, invoke_without_command=True) @click.group('cli', context_settings={'show_default': True}, invoke_without_command=True)
@ -24,15 +25,95 @@ def cli(ctx, config):
if not ctx.invoked_subcommand: if not ctx.invoked_subcommand:
if app.config.host.endswith('example.com'): if app.config.host.endswith('example.com'):
relay_setup.callback() cli_setup.callback()
else: else:
relay_run.callback() cli_run.callback()
@cli.command('setup')
def cli_setup():
'Generate a new config'
while True:
app.config.host = click.prompt('What domain will the relay be hosted on?', default=app.config.host)
if not app.config.host.endswith('example.com'):
break
click.echo('The domain must not be example.com')
if not app.config.is_docker:
app.config.listen = click.prompt('Which address should the relay listen on?', default=app.config.listen)
while True:
app.config.port = click.prompt('What TCP port should the relay listen on?', default=app.config.port, type=int)
break
app.config.save()
if not app.config.is_docker and click.confirm('Relay all setup! Would you like to run it now?'):
cli_run.callback()
@cli.command('run')
def cli_run():
'Run the relay'
if app.config.host.endswith('example.com'):
return click.echo('Relay is not set up. Please edit your relay config or run "activityrelay setup".')
vers_split = platform.python_version().split('.')
pip_command = 'pip3 uninstall pycrypto && pip3 install pycryptodome'
if Crypto.__version__ == '2.6.1':
if int(vers_split[1]) > 7:
click.echo('Error: PyCrypto is broken on Python 3.8+. Please replace it with pycryptodome before running again. Exiting...')
return click.echo(pip_command)
else:
click.echo('Warning: PyCrypto is old and should be replaced with pycryptodome')
return click.echo(pip_command)
if not misc.check_open_port(app.config.listen, app.config.port):
return click.echo(f'Error: A server is already running on port {app.config.port}')
app.run()
# todo: add config default command for resetting config key
@cli.group('config')
def cli_config():
'Manage the relay config'
pass
@cli_config.command('list')
def cli_config_list():
'List the current relay config'
click.echo('Relay Config:')
for key, value in app.config.items():
if key not in CONFIG_IGNORE:
key = f'{key}:'.ljust(20)
click.echo(f'- {key} {value}')
@cli_config.command('set')
@click.argument('key')
@click.argument('value')
def cli_config_set(key, value):
'Set a config value'
app.config[key] = value
app.config.save()
print(f'{key}: {app.config[key]}')
@cli.group('inbox') @cli.group('inbox')
@click.pass_context def cli_inbox():
def cli_inbox(ctx):
'Manage the inboxes in the database' 'Manage the inboxes in the database'
pass pass
@ -67,15 +148,19 @@ def cli_inbox_follow(actor):
inbox = inbox_data['inbox'] inbox = inbox_data['inbox']
except KeyError: except KeyError:
actor_data = asyncio.run(misc.request(actor)) actor_data = asyncio.run(app.client.get(actor, sign_headers=True))
if not actor_data:
return click.echo(f'Failed to fetch actor: {actor}')
inbox = actor_data.shared_inbox inbox = actor_data.shared_inbox
message = misc.Message.new_follow( message = misc.Message.new_follow(
host = app.config.host, host = app.config.host,
actor = actor.id actor = actor
) )
asyncio.run(misc.request(inbox, message)) asyncio.run(app.client.post(inbox, message))
click.echo(f'Sent follow message to actor: {actor}') click.echo(f'Sent follow message to actor: {actor}')
@ -101,7 +186,7 @@ def cli_inbox_unfollow(actor):
) )
except KeyError: except KeyError:
actor_data = asyncio.run(misc.request(actor)) actor_data = asyncio.run(app.client.get(actor, sign_headers=True))
inbox = actor_data.shared_inbox inbox = actor_data.shared_inbox
message = misc.Message.new_unfollow( message = misc.Message.new_unfollow(
host = app.config.host, host = app.config.host,
@ -113,7 +198,7 @@ def cli_inbox_unfollow(actor):
} }
) )
asyncio.run(misc.request(inbox, message)) asyncio.run(app.client.post(inbox, message))
click.echo(f'Sent unfollow message to: {actor}') click.echo(f'Sent unfollow message to: {actor}')
@ -128,11 +213,13 @@ def cli_inbox_add(inbox):
if app.config.is_banned(inbox): if app.config.is_banned(inbox):
return click.echo(f'Error: Refusing to add banned inbox: {inbox}') return click.echo(f'Error: Refusing to add banned inbox: {inbox}')
if app.database.add_inbox(inbox): if app.database.get_inbox(inbox):
app.database.save() return click.echo(f'Error: Inbox already in database: {inbox}')
return click.echo(f'Added inbox to the database: {inbox}')
click.echo(f'Error: Inbox already in database: {inbox}') app.database.add_inbox(inbox)
app.database.save()
click.echo(f'Added inbox to the database: {inbox}')
@cli_inbox.command('remove') @cli_inbox.command('remove')
@ -228,21 +315,21 @@ def cli_software_ban(name, fetch_nodeinfo):
'Ban software. Use RELAYS for NAME to ban relays' 'Ban software. Use RELAYS for NAME to ban relays'
if name == 'RELAYS': if name == 'RELAYS':
for name in relay_software_names: for name in RELAY_SOFTWARE:
app.config.ban_software(name) app.config.ban_software(name)
app.config.save() app.config.save()
return click.echo('Banned all relay software') return click.echo('Banned all relay software')
if fetch_nodeinfo: if fetch_nodeinfo:
software = asyncio.run(misc.fetch_nodeinfo(name)) nodeinfo = asyncio.run(app.client.fetch_nodeinfo(name))
if not software: if not nodeinfo:
click.echo(f'Failed to fetch software name from domain: {name}') click.echo(f'Failed to fetch software name from domain: {name}')
name = software name = nodeinfo.sw_name
if config.ban_software(name): if app.config.ban_software(name):
app.config.save() app.config.save()
return click.echo(f'Banned software: {name}') return click.echo(f'Banned software: {name}')
@ -258,19 +345,19 @@ def cli_software_unban(name, fetch_nodeinfo):
'Ban software. Use RELAYS for NAME to unban relays' 'Ban software. Use RELAYS for NAME to unban relays'
if name == 'RELAYS': if name == 'RELAYS':
for name in relay_software_names: for name in RELAY_SOFTWARE:
app.config.unban_software(name) app.config.unban_software(name)
config.save() app.config.save()
return click.echo('Unbanned all relay software') return click.echo('Unbanned all relay software')
if fetch_nodeinfo: if fetch_nodeinfo:
software = asyncio.run(misc.fetch_nodeinfo(name)) nodeinfo = asyncio.run(app.client.fetch_nodeinfo(name))
if not software: if not nodeinfo:
click.echo(f'Failed to fetch software name from domain: {name}') click.echo(f'Failed to fetch software name from domain: {name}')
name = software name = nodeinfo.sw_name
if app.config.unban_software(name): if app.config.unban_software(name):
app.config.save() app.config.save()
@ -279,7 +366,6 @@ def cli_software_unban(name, fetch_nodeinfo):
click.echo(f'Software wasn\'t banned: {name}') click.echo(f'Software wasn\'t banned: {name}')
@cli.group('whitelist') @cli.group('whitelist')
def cli_whitelist(): def cli_whitelist():
'Manage the instance whitelist' 'Manage the instance whitelist'
@ -288,6 +374,8 @@ def cli_whitelist():
@cli_whitelist.command('list') @cli_whitelist.command('list')
def cli_whitelist_list(): def cli_whitelist_list():
'List all the instances in the whitelist'
click.echo('Current whitelisted domains') click.echo('Current whitelisted domains')
for domain in app.config.whitelist: for domain in app.config.whitelist:
@ -317,59 +405,18 @@ def cli_whitelist_remove(instance):
app.config.save() app.config.save()
if app.config.whitelist_enabled: if app.config.whitelist_enabled:
if app.database.del_inbox(inbox): if app.database.del_inbox(instance):
app.database.save() app.database.save()
click.echo(f'Removed instance from the whitelist: {instance}') click.echo(f'Removed instance from the whitelist: {instance}')
@cli.command('setup') @cli_whitelist.command('import')
def relay_setup(): def cli_whitelist_import():
'Generate a new config' 'Add all current inboxes to the whitelist'
while True: for domain in app.database.hostnames:
app.config.host = click.prompt('What domain will the relay be hosted on?', default=app.config.host) cli_whitelist_add.callback(domain)
if not app.config.host.endswith('example.com'):
break
click.echo('The domain must not be example.com')
app.config.listen = click.prompt('Which address should the relay listen on?', default=app.config.listen)
while True:
app.config.port = click.prompt('What TCP port should the relay listen on?', default=app.config.port, type=int)
break
app.config.save()
if not app['is_docker'] and click.confirm('Relay all setup! Would you like to run it now?'):
relay_run.callback()
@cli.command('run')
def relay_run():
'Run the relay'
if app.config.host.endswith('example.com'):
return click.echo('Relay is not set up. Please edit your relay config or run "activityrelay setup".')
vers_split = platform.python_version().split('.')
pip_command = 'pip3 uninstall pycrypto && pip3 install pycryptodome'
if Crypto.__version__ == '2.6.1':
if int(vers_split[1]) > 7:
click.echo('Error: PyCrypto is broken on Python 3.8+. Please replace it with pycryptodome before running again. Exiting...')
return click.echo(pip_command)
else:
click.echo('Warning: PyCrypto is old and should be replaced with pycryptodome')
return click.echo(pip_command)
if not misc.check_open_port(app.config.listen, app.config.port):
return click.echo(f'Error: A server is already running on port {app.config.port}')
app.run()
def main(): def main():

View file

@ -1,3 +1,4 @@
import aputils
import asyncio import asyncio
import base64 import base64
import json import json
@ -6,10 +7,6 @@ import socket
import traceback import traceback
import uuid import uuid
from Crypto.Hash import SHA, SHA256, SHA512
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from aiohttp import ClientSession
from aiohttp.hdrs import METH_ALL as METHODS from aiohttp.hdrs import METH_ALL as METHODS
from aiohttp.web import Response as AiohttpResponse, View as AiohttpView from aiohttp.web import Response as AiohttpResponse, View as AiohttpView
from datetime import datetime from datetime import datetime
@ -17,17 +14,9 @@ from json.decoder import JSONDecodeError
from urllib.parse import urlparse from urllib.parse import urlparse
from uuid import uuid4 from uuid import uuid4
from .http_debug import http_debug
app = None app = None
HASHES = {
'sha1': SHA,
'sha256': SHA256,
'sha512': SHA512
}
MIMETYPES = { MIMETYPES = {
'activity': 'application/activity+json', 'activity': 'application/activity+json',
'html': 'text/html', 'html': 'text/html',
@ -46,8 +35,35 @@ def set_app(new_app):
app = new_app app = new_app
def build_signing_string(headers, used_headers): def boolean(value):
return '\n'.join(map(lambda x: ': '.join([x.lower(), headers[x]]), used_headers)) if isinstance(value, str):
if value.lower() in ['on', 'y', 'yes', 'true', 'enable', 'enabled', '1']:
return True
elif value.lower() in ['off', 'n', 'no', 'false', 'disable', 'disable', '0']:
return False
else:
raise TypeError(f'Cannot parse string "{value}" as a boolean')
elif isinstance(value, int):
if value == 1:
return True
elif value == 0:
return False
else:
raise ValueError('Integer value must be 1 or 0')
elif value == None:
return False
try:
return value.__bool__()
except AttributeError:
raise TypeError(f'Cannot convert object of type "{clsname(value)}"')
def check_open_port(host, port): def check_open_port(host, port):
@ -62,203 +78,6 @@ def check_open_port(host, port):
return False return False
def create_signature_header(headers):
headers = {k.lower(): v for k, v in headers.items()}
used_headers = headers.keys()
sigstring = build_signing_string(headers, used_headers)
sig = {
'keyId': app.config.keyid,
'algorithm': 'rsa-sha256',
'headers': ' '.join(used_headers),
'signature': sign_signing_string(sigstring, app.database.PRIVKEY)
}
chunks = ['{}="{}"'.format(k, v) for k, v in sig.items()]
return ','.join(chunks)
def distill_inboxes(actor, object_id):
for inbox in app.database.inboxes:
if inbox != actor.shared_inbox and urlparse(inbox).hostname != urlparse(object_id).hostname:
yield inbox
def generate_body_digest(body):
bodyhash = app.cache.digests.get(body)
if bodyhash:
return bodyhash
h = SHA256.new(body.encode('utf-8'))
bodyhash = base64.b64encode(h.digest()).decode('utf-8')
app.cache.digests[body] = bodyhash
return bodyhash
def sign_signing_string(sigstring, key):
pkcs = PKCS1_v1_5.new(key)
h = SHA256.new()
h.update(sigstring.encode('ascii'))
sigdata = pkcs.sign(h)
return base64.b64encode(sigdata).decode('utf-8')
async def fetch_actor_key(actor):
actor_data = await request(actor)
if not actor_data:
return None
try:
return RSA.importKey(actor_data['publicKey']['publicKeyPem'])
except Exception as e:
logging.debug(f'Exception occured while fetching actor key: {e}')
async def fetch_nodeinfo(domain):
nodeinfo_url = None
wk_nodeinfo = await request(f'https://{domain}/.well-known/nodeinfo', sign_headers=False, activity=False)
if not wk_nodeinfo:
return
wk_nodeinfo = WKNodeinfo(wk_nodeinfo)
for version in ['20', '21']:
try:
nodeinfo_url = wk_nodeinfo.get_url(version)
except KeyError:
pass
if not nodeinfo_url:
logging.verbose(f'Failed to fetch nodeinfo url for domain: {domain}')
return False
nodeinfo = await request(nodeinfo_url, sign_headers=False, activity=False)
try:
return nodeinfo['software']['name']
except KeyError:
return False
async def request(uri, data=None, force=False, sign_headers=True, activity=True):
## If a get request and not force, try to use the cache first
if not data and not force:
try:
return app.cache.json[uri]
except KeyError:
pass
url = urlparse(uri)
method = 'POST' if data else 'GET'
action = data.get('type') if data else None
headers = {
'Accept': f'{MIMETYPES["activity"]}, {MIMETYPES["json"]};q=0.9',
'User-Agent': 'ActivityRelay',
}
if data:
headers['Content-Type'] = MIMETYPES['activity' if activity else 'json']
if sign_headers:
signing_headers = {
'(request-target)': f'{method.lower()} {url.path}',
'Date': datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT'),
'Host': url.netloc
}
if data:
assert isinstance(data, dict)
data = json.dumps(data)
signing_headers.update({
'Digest': f'SHA-256={generate_body_digest(data)}',
'Content-Length': str(len(data.encode('utf-8')))
})
signing_headers['Signature'] = create_signature_header(signing_headers)
del signing_headers['(request-target)']
del signing_headers['Host']
headers.update(signing_headers)
try:
if data:
logging.verbose(f'Sending "{action}" to inbox: {uri}')
else:
logging.verbose(f'Sending GET request to url: {uri}')
async with ClientSession(trace_configs=http_debug()) as session, app.semaphore:
async with session.request(method, uri, headers=headers, data=data) as resp:
## aiohttp has been known to leak if the response hasn't been read,
## so we're just gonna read the request no matter what
resp_data = await resp.read()
## Not expecting a response, so just return
if resp.status == 202:
return
elif resp.status != 200:
if not resp_data:
return logging.verbose(f'Received error when requesting {uri}: {resp.status} {resp_data}')
return logging.verbose(f'Received error when sending {action} to {uri}: {resp.status} {resp_data}')
if resp.content_type == MIMETYPES['activity']:
resp_data = await resp.json(loads=Message.new_from_json)
elif resp.content_type == MIMETYPES['json']:
resp_data = await resp.json(loads=DotDict.new_from_json)
else:
logging.verbose(f'Invalid Content-Type for "{url}": {resp.content_type}')
return logging.debug(f'Response: {resp_data}')
logging.debug(f'{uri} >> resp {resp_data}')
app.cache.json[uri] = resp_data
return resp_data
except JSONDecodeError:
return
except Exception:
traceback.print_exc()
async def validate_signature(actor, signature, http_request):
headers = {key.lower(): value for key, value in http_request.headers.items()}
headers['(request-target)'] = ' '.join([http_request.method.lower(), http_request.path])
sigstring = build_signing_string(headers, signature['headers'])
logging.debug(f'sigstring: {sigstring}')
sign_alg, _, hash_alg = signature['algorithm'].partition('-')
logging.debug(f'sign alg: {sign_alg}, hash alg: {hash_alg}')
sigdata = base64.b64decode(signature['signature'])
pkcs = PKCS1_v1_5.new(actor.PUBKEY)
h = HASHES[hash_alg].new()
h.update(sigstring.encode('ascii'))
result = pkcs.verify(h, sigdata)
http_request['validated'] = result
logging.debug(f'validates? {result}')
return result
class DotDict(dict): class DotDict(dict):
def __init__(self, _data, **kwargs): def __init__(self, _data, **kwargs):
dict.__init__(self) dict.__init__(self)
@ -427,16 +246,6 @@ class Message(DotDict):
# actor properties # actor properties
@property
def PUBKEY(self):
return RSA.import_key(self.pubkey)
@property
def pubkey(self):
return self.publicKey.publicKeyPem
@property @property
def shared_inbox(self): def shared_inbox(self):
return self.get('endpoints', {}).get('sharedInbox', self.inbox) return self.get('endpoints', {}).get('sharedInbox', self.inbox)
@ -459,6 +268,11 @@ class Message(DotDict):
return self.object return self.object
@property
def signer(self):
return aputils.Signer.new_from_actor(self)
class Response(AiohttpResponse): class Response(AiohttpResponse):
@classmethod @classmethod
def new(cls, body='', status=200, headers=None, ctype='text'): def new(cls, body='', status=200, headers=None, ctype='text'):
@ -516,11 +330,6 @@ class View(AiohttpView):
return self._request.app return self._request.app
@property
def cache(self):
return self.app.cache
@property @property
def config(self): def config(self):
return self.app.config return self.app.config
@ -529,22 +338,3 @@ class View(AiohttpView):
@property @property
def database(self): def database(self):
return self.app.database return self.app.database
class WKNodeinfo(DotDict):
@classmethod
def new(cls, v20, v21):
return cls({
'links': [
{'rel': NODEINFO_NS['20'], 'href': v20},
{'rel': NODEINFO_NS['21'], 'href': v21}
]
})
def get_url(self, version='20'):
for item in self.links:
if item['rel'] == NODEINFO_NS[version]:
return item['href']
raise KeyError(version)

View file

@ -1,64 +1,104 @@
import asyncio import asyncio
import logging import logging
from cachetools import LRUCache
from uuid import uuid4 from uuid import uuid4
from . import misc from .misc import Message
async def handle_relay(request, actor, data, software): cache = LRUCache(1024)
if data.objectid in request.app.cache.objects:
logging.verbose(f'already relayed {data.objectid}')
def person_check(actor, software):
## pleroma and akkoma use Person for the actor type for some reason
if software in {'akkoma', 'pleroma'} and actor.id != f'https://{actor.domain}/relay':
return True
## make sure the actor is an application
elif actor.type != 'Application':
return True
async def handle_relay(request):
if request.message.objectid in cache:
logging.verbose(f'already relayed {request.message.objectid}')
return return
logging.verbose(f'Relaying post from {data.actorid}') message = Message.new_announce(
host = request.config.host,
message = misc.Message.new_announce( object = request.message.objectid
host = request.app.config.host,
object = data.objectid
) )
cache[request.message.objectid] = message.id
logging.debug(f'>> relay: {message}') logging.debug(f'>> relay: {message}')
inboxes = misc.distill_inboxes(actor, data.objectid) inboxes = request.database.distill_inboxes(request.message)
futures = [misc.request(inbox, data=message) for inbox in inboxes]
asyncio.ensure_future(asyncio.gather(*futures)) for inbox in inboxes:
request.app.cache.objects[data.objectid] = message.id request.app.push_message(inbox, message)
async def handle_forward(request, actor, data, software): async def handle_forward(request):
if data.id in request.app.cache.objects: if request.message.id in cache:
logging.verbose(f'already forwarded {data.id}') logging.verbose(f'already forwarded {request.message.id}')
return return
message = misc.Message.new_announce( message = Message.new_announce(
host = request.app.config.host, host = request.config.host,
object = data object = request.message
) )
logging.verbose(f'Forwarding post from {actor.id}') cache[request.message.id] = message.id
logging.debug(f'>> Relay {data}') logging.debug(f'>> forward: {message}')
inboxes = misc.distill_inboxes(actor, data.id) inboxes = request.database.distill_inboxes(request.message)
futures = [misc.request(inbox, data=message) for inbox in inboxes]
asyncio.ensure_future(asyncio.gather(*futures)) for inbox in inboxes:
request.app.cache.objects[data.id] = message.id request.app.push_message(inbox, message)
async def handle_follow(request, actor, data, software): async def handle_follow(request):
if not request.app.database.add_inbox(actor.shared_inbox, data.id): nodeinfo = await request.app.client.fetch_nodeinfo(request.actor.domain)
request.app.database.set_followid(actor.id, data.id) software = nodeinfo.sw_name if nodeinfo else None
request.app.database.save() ## reject if software used by actor is banned
if request.config.is_banned_software(software):
request.app.push_message(
request.actor.shared_inbox,
Message.new_response(
host = request.config.host,
actor = request.actor.id,
followid = request.message.id,
accept = False
)
)
await misc.request( return logging.verbose(f'Rejected follow from actor for using specific software: actor={request.actor.id}, software={software}')
actor.shared_inbox,
misc.Message.new_response( ## reject if the actor is not an instance actor
host = request.app.config.host, if person_check(request.actor, software):
actor = actor.id, request.app.push_message(
followid = data.id, request.actor.shared_inbox,
Message.new_response(
host = request.config.host,
actor = request.actor.id,
followid = request.message.id,
accept = False
)
)
return logging.verbose(f'Non-application actor tried to follow: {request.actor.id}')
request.database.add_inbox(request.actor.shared_inbox, request.message.id, software)
request.database.save()
request.app.push_message(
request.actor.shared_inbox,
Message.new_response(
host = request.config.host,
actor = request.actor.id,
followid = request.message.id,
accept = True accept = True
) )
) )
@ -66,33 +106,34 @@ async def handle_follow(request, actor, data, software):
# Are Akkoma and Pleroma the only two that expect a follow back? # Are Akkoma and Pleroma the only two that expect a follow back?
# Ignoring only Mastodon for now # Ignoring only Mastodon for now
if software != 'mastodon': if software != 'mastodon':
await misc.request( request.app.push_message(
actor.shared_inbox, request.actor.shared_inbox,
misc.Message.new_follow( Message.new_follow(
host = request.app.config.host, host = request.config.host,
actor = actor.id actor = request.actor.id
) )
) )
async def handle_undo(request, actor, data, software): async def handle_undo(request):
## If the object is not a Follow, forward it ## If the object is not a Follow, forward it
if data['object']['type'] != 'Follow': if request.message.object.type != 'Follow':
return await handle_forward(request, actor, data, software) return await handle_forward(request)
if not request.app.database.del_inbox(actor.domain, data.id): if not request.database.del_inbox(request.actor.domain, request.message.id):
return return
request.app.database.save() request.database.save()
message = misc.Message.new_unfollow( request.app.push_message(
host = request.app.config.host, request.actor.shared_inbox,
actor = actor.id, Message.new_unfollow(
follow = data host = request.config.host,
actor = request.actor.id,
follow = request.message
)
) )
await misc.request(actor.shared_inbox, message)
processors = { processors = {
'Announce': handle_relay, 'Announce': handle_relay,
@ -104,9 +145,16 @@ processors = {
} }
async def run_processor(request, actor, data, software): async def run_processor(request):
if data.type not in processors: if request.message.type not in processors:
return return
logging.verbose(f'New "{data.type}" from actor: {actor.id}') if request.instance and not request.instance.get('software'):
return await processors[data.type](request, actor, data, software) nodeinfo = await request.app.client.fetch_nodeinfo(request.instance['domain'])
if nodeinfo:
request.instance['software'] = nodeinfo.sw_name
request.database.save()
logging.verbose(f'New "{request.message.type}" from actor: {request.actor.id}')
return await processors[request.message.type](request)

View file

@ -1,3 +1,5 @@
import aputils
import asyncio
import logging import logging
import subprocess import subprocess
import traceback import traceback
@ -5,8 +7,7 @@ import traceback
from pathlib import Path from pathlib import Path
from . import __version__, misc from . import __version__, misc
from .http_debug import STATS from .misc import DotDict, Message, Response
from .misc import DotDict, Message, Response, WKNodeinfo
from .processors import run_processor from .processors import run_processor
@ -33,10 +34,10 @@ def register_route(method, path):
@register_route('GET', '/') @register_route('GET', '/')
async def home(request): async def home(request):
targets = '<br>'.join(request.app.database.hostnames) targets = '<br>'.join(request.database.hostnames)
note = request.app.config.note note = request.config.note
count = len(request.app.database.hostnames) count = len(request.database.hostnames)
host = request.app.config.host host = request.config.host
text = f""" text = f"""
<html><head> <html><head>
@ -64,8 +65,8 @@ a:hover {{ color: #8AF; }}
@register_route('GET', '/actor') @register_route('GET', '/actor')
async def actor(request): async def actor(request):
data = Message.new_actor( data = Message.new_actor(
host = request.app.config.host, host = request.config.host,
pubkey = request.app.database.pubkey pubkey = request.database.signer.pubkey
) )
return Response.new(data, ctype='activity') return Response.new(data, ctype='activity')
@ -74,67 +75,74 @@ async def actor(request):
@register_route('POST', '/inbox') @register_route('POST', '/inbox')
@register_route('POST', '/actor') @register_route('POST', '/actor')
async def inbox(request): async def inbox(request):
config = request.app.config config = request.config
database = request.app.database database = request.database
## reject if missing signature header ## reject if missing signature header
try: if not request.signature:
signature = DotDict.new_from_signature(request.headers['signature'])
except KeyError:
logging.verbose('Actor missing signature header') logging.verbose('Actor missing signature header')
raise HTTPUnauthorized(body='missing signature') raise HTTPUnauthorized(body='missing signature')
## read message
try: try:
data = await request.json(loads=Message.new_from_json) request['message'] = await request.json(loads=Message.new_from_json)
## reject if there is no message
if not request.message:
logging.verbose('empty message')
return Response.new_error(400, 'missing message', 'json')
## reject if there is no actor in the message ## reject if there is no actor in the message
if 'actor' not in data: if 'actor' not in request.message:
logging.verbose('actor not in data') logging.verbose('actor not in message')
return Response.new_error(400, 'no actor in message', 'json') return Response.new_error(400, 'no actor in message', 'json')
except: except:
## this code should hopefully never get called
traceback.print_exc() traceback.print_exc()
logging.verbose('Failed to parse inbox message') logging.verbose('Failed to parse inbox message')
return Response.new_error(400, 'failed to parse message', 'json') return Response.new_error(400, 'failed to parse message', 'json')
actor = await misc.request(signature.keyid) request['actor'] = await request.app.client.get(request.signature.keyid, sign_headers=True)
software = await misc.fetch_nodeinfo(actor.domain)
## reject if actor is empty ## reject if actor is empty
if not actor: if not request.actor:
logging.verbose(f'Failed to fetch actor: {actor.id}') ## ld signatures aren't handled atm, so just ignore it
if request['message'].type == 'Delete':
logging.verbose(f'Instance sent a delete which cannot be handled')
return Response.new(status=202)
logging.verbose(f'Failed to fetch actor: {request.signature.keyid}')
return Response.new_error(400, 'failed to fetch actor', 'json') return Response.new_error(400, 'failed to fetch actor', 'json')
request['instance'] = request.database.get_inbox(request['actor'].inbox)
## reject if the actor isn't whitelisted while the whiltelist is enabled ## reject if the actor isn't whitelisted while the whiltelist is enabled
elif config.whitelist_enabled and not config.is_whitelisted(actor.domain): if config.whitelist_enabled and not config.is_whitelisted(request.actor.domain):
logging.verbose(f'Rejected actor for not being in the whitelist: {actor.id}') logging.verbose(f'Rejected actor for not being in the whitelist: {request.actor.id}')
return Response.new_error(403, 'access denied', 'json') return Response.new_error(403, 'access denied', 'json')
## reject if actor is banned ## reject if actor is banned
if request.app['config'].is_banned(actor.domain): if request.config.is_banned(request.actor.domain):
logging.verbose(f'Ignored request from banned actor: {actor.id}') logging.verbose(f'Ignored request from banned actor: {actor.id}')
return Response.new_error(403, 'access denied', 'json') return Response.new_error(403, 'access denied', 'json')
## reject if software used by actor is banned
if config.is_banned_software(software):
logging.verbose(f'Rejected actor for using specific software: {software}')
return Response.new_error(403, 'access denied', 'json')
## reject if the signature is invalid ## reject if the signature is invalid
if not (await misc.validate_signature(actor, signature, request)): try:
await request.actor.signer.validate_aiohttp_request(request)
except aputils.SignatureValidationError as e:
logging.verbose(f'signature validation failed for: {actor.id}') logging.verbose(f'signature validation failed for: {actor.id}')
return Response.new_error(401, 'signature check failed', 'json') logging.debug(str(e))
return Response.new_error(401, str(e), 'json')
## reject if activity type isn't 'Follow' and the actor isn't following ## reject if activity type isn't 'Follow' and the actor isn't following
if data['type'] != 'Follow' and not database.get_inbox(actor.domain): if request.message.type != 'Follow' and not database.get_inbox(request.actor.domain):
logging.verbose(f'Rejected actor for trying to post while not following: {actor.id}') logging.verbose(f'Rejected actor for trying to post while not following: {request.actor.id}')
return Response.new_error(401, 'access denied', 'json') return Response.new_error(401, 'access denied', 'json')
logging.debug(f">> payload {data}") logging.debug(f">> payload {request.message.to_json(4)}")
await run_processor(request, actor, data, software) asyncio.ensure_future(run_processor(request))
return Response.new(status=202) return Response.new(status=202)
@ -146,63 +154,38 @@ async def webfinger(request):
except KeyError: except KeyError:
return Response.new_error(400, 'missing \'resource\' query key', 'json') return Response.new_error(400, 'missing \'resource\' query key', 'json')
if subject != f'acct:relay@{request.app.config.host}': if subject != f'acct:relay@{request.config.host}':
return Response.new_error(404, 'user not found', 'json') return Response.new_error(404, 'user not found', 'json')
data = { data = aputils.Webfinger.new(
'subject': subject, handle = 'relay',
'aliases': [request.app.config.actor], domain = request.config.host,
'links': [ actor = request.config.actor
{'href': request.app.config.actor, 'rel': 'self', 'type': 'application/activity+json'},
{'href': request.app.config.actor, 'rel': 'self', 'type': 'application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"'}
]
}
return Response.new(data, ctype='json')
@register_route('GET', '/nodeinfo/{version:\d.\d\.json}')
async def nodeinfo_2_0(request):
niversion = request.match_info['version'][:3]
data = {
'openRegistrations': not request.app.config.whitelist_enabled,
'protocols': ['activitypub'],
'services': {
'inbound': [],
'outbound': []
},
'software': {
'name': 'activityrelay',
'version': version
},
'usage': {
'localPosts': 0,
'users': {
'total': 1
}
},
'metadata': {
'peers': request.app.database.hostnames
},
'version': niversion
}
if version == '2.1':
data['software']['repository'] = 'https://git.pleroma.social/pleroma/relay'
return Response.new(data, ctype='json')
@register_route('GET', '/.well-known/nodeinfo')
async def nodeinfo_wellknown(request):
data = WKNodeinfo.new(
v20 = f'https://{request.app.config.host}/nodeinfo/2.0.json',
v21 = f'https://{request.app.config.host}/nodeinfo/2.1.json'
) )
return Response.new(data, ctype='json') return Response.new(data, ctype='json')
@register_route('GET', '/stats') @register_route('GET', '/nodeinfo/{version:\d.\d\.json}')
async def stats(request): async def nodeinfo(request):
return Response.new(STATS, ctype='json') niversion = request.match_info['version'][:3]
data = dict(
name = 'activityrelay',
version = version,
protocols = ['activitypub'],
open_regs = not request.config.whitelist_enabled,
users = 1,
metadata = {'peers': request.database.hostnames}
)
if niversion == '2.1':
data['repo'] = 'https://git.pleroma.social/pleroma/relay'
return Response.new(aputils.Nodeinfo.new(**data), ctype='json')
@register_route('GET', '/.well-known/nodeinfo')
async def nodeinfo_wellknown(request):
data = aputils.WellKnownNodeinfo.new_template(request.config.host)
return Response.new(data, ctype='json')

View file

@ -1 +1,5 @@
. aiohttp>=3.8.0
aputils@https://git.barkshark.xyz/barkshark/aputils/archive/0.1.3.tar.gz
cachetools>=5.2.0
click>=8.1.2
pyyaml>=6.0

View file

@ -1,6 +1,6 @@
[metadata] [metadata]
name = relay name = relay
version = 0.2.3 version = attr: relay.__version__
description = Generic LitePub relay (works with all LitePub consumers and Mastodon) description = Generic LitePub relay (works with all LitePub consumers and Mastodon)
long_description = file: README.md long_description = file: README.md
long_description_content_type = text/markdown; charset=UTF-8 long_description_content_type = text/markdown; charset=UTF-8
@ -22,13 +22,8 @@ project_urls =
[options] [options]
zip_safe = False zip_safe = False
packages = find: packages = find:
install_requires = install_requires = file: requirements.txt
aiohttp >= 3.8.0 python_requires = >=3.7
cachetools >= 5.0.0
click >= 8.1.2
pycryptodome >= 3.14.1
PyYAML >= 5.0.0
python_requires = >=3.6
[options.extras_require] [options.extras_require]
dev = dev =